Skip to content

Commit

Permalink
feat: support const project online (#3376)
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Sep 14, 2023
1 parent 07f6862 commit 330d171
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 103 deletions.
42 changes: 12 additions & 30 deletions cases/query/const_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

debugs: []
version: 0.5.0
# FIXME: support request procedure for const project
# requires GetTablet impl for non-table procedure
cases:
- id: 0
desc: select const number
mode: request-unsupport
mode: procedure-unsupport
db: db1
sql: |
select 1 as id, 2 as col1, 3.3 as col2;
Expand All @@ -29,7 +31,7 @@ cases:
- [1, 2, 3.3]
- id: 1
desc: select str
mode: request-unsupport
mode: procedure-unsupport
db: db1
sql: |
select 1 as id, "hello_world" as col1;
Expand All @@ -41,7 +43,7 @@ cases:
- [1, "hello_world"]
- id: 2
desc: const substr pos len
mode: request-unsupport
mode: procedure-unsupport
db: db1
sql: |
select 1 as id, substring("hello_world", 3, 6) as col1;
Expand All @@ -53,7 +55,7 @@ cases:
- [1, "llo_wo"]
- id: 3
desc: const substr pos
mode: request-unsupport
mode: procedure-unsupport
db: db1
sql: |
select 1 as id, substring("hello_world", 3) as col1;
Expand All @@ -65,7 +67,7 @@ cases:
- [1, "llo_world"]
- id: 4
desc: const concat 1
mode: request-unsupport
mode: procedure-unsupport
db: db1
sql: |
select 1 as id, concat("hello", "world", "abc") as col1;
Expand All @@ -76,13 +78,8 @@ cases:
- [1, "helloworldabc"]
- id: 5
desc: cast常量 using CAST operator
mode: request-unsupport
mode: procedure-unsupport
db: db1
inputs:
- columns: ["c1 int", "c2 string", "c5 bigint"]
indexs: ["index1:c1:c5"]
rows:
- [1, "2020-05-22 10:43:40", 1]
sql: |
select CAST (10 as int) as c1, CAST (10 as bigint) as c2, CAST (10 as float) as c3, CAST (10 as double) as c4, CAST (1590115460000 as timestamp) as c5, CAST ("2020-05-20" as date) as c6, CAST (10 as string) as c7;
expect:
Expand All @@ -91,13 +88,8 @@ cases:
- [10, 10, 10.0, 10.0, 1590115460000, '2020-05-20', "10"]
- id: 6
desc: cast NULL常量 using CAST operator
mode: request-unsupport
mode: procedure-unsupport
db: db1
inputs:
- columns: ["c1 int", "c2 string", "c5 bigint"]
indexs: ["index1:c1:c5"]
rows:
- [1, "2020-05-22 10:43:40", 1]
sql: |
select CAST (NULL as int) as c1, CAST (NULL as bigint) as c2, CAST (NULL as float) as c3, CAST (NULL as double) as c4, CAST (NULL as timestamp) as c5, CAST (NULL as date) as c6, CAST (NULL as string) as c7;
expect:
Expand All @@ -106,13 +98,8 @@ cases:
- [NULL, NULL, NULL, NULL, NULL, NULL, NULL]
- id: 7
desc: cast常量 using type() function
mode: request-unsupport
mode: procedure-unsupport
db: db1
inputs:
- columns: ["c1 int", "c2 string", "c5 bigint"]
indexs: ["index1:c1:c5"]
rows:
- [1, "2020-05-22 10:43:40", 1]
sql: |
select int(10) as c1, bigint(10) as c2, float(10) as c3, double(10) as c4, timestamp(1590115460000) as c5, date("2020-05-20") as c6, string(10) as c7;
expect:
Expand All @@ -121,13 +108,8 @@ cases:
- [10, 10, 10.0, 10.0, 1590115460000, '2020-05-20', "10"]
- id: 8
desc: cast NULL常量 using type(NULL) function
mode: request-unsupport
mode: procedure-unsupport
db: db1
inputs:
- columns: ["c1 int", "c2 string", "c5 bigint"]
indexs: ["index1:c1:c5"]
rows:
- [1, "2020-05-22 10:43:40", 1]
sql: |
select int(NULL) as c1, bigint(NULL) as c2, float(NULL) as c3, double(NULL) as c4, timestamp(NULL) as c5, date(NULL) as c6, string(NULL) as c7;
expect:
Expand All @@ -136,7 +118,7 @@ cases:
- [NULL, NULL, NULL, NULL, NULL, NULL, NULL]
- id: 9
desc: differnt const node type
mode: request-unsupport
mode: procedure-unsupport
db: db1
sql: |
select true c1, int16(3) c2, 13 c3, 10.0 c4, 'a string' c5, date(timestamp(1590115420000)) c6, timestamp(1590115420000) c7;
Expand Down
8 changes: 4 additions & 4 deletions hybridse/include/case/sql_case.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ class SqlCase {
static std::string GenRand(const std::string& prefix) {
return prefix + std::to_string(rand() % 10000000 + 1); // NOLINT
}
absl::StatusOr<std::string> BuildCreateSpSqlFromInput(int32_t input_idx, absl::string_view sql,
const std::set<size_t>& common_idx);
absl::StatusOr<std::string> BuildCreateSpSqlFromSchema(const type::TableDef& table, absl::string_view select_sql,
const std::set<size_t>& common_idx);
absl::StatusOr<std::string> BuildCreateSpSql(absl::string_view sql, const std::set<size_t>& common_idx,
std::optional<int32_t> input_idx);
absl::StatusOr<std::string> BuildCreateSpSql(absl::string_view select_sql, const std::set<size_t>& common_idx,
std::optional<const type::TableDef*> table);

friend std::ostream& operator<<(std::ostream& output, const SqlCase& thiz);
static bool IS_PERF() {
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/sdk/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct Status {
cm.append("] ");
cm.append(msg);
return cm;
};
}

int code;
// msg use prepend and append, it's better to use absl::Cord, but we may directly use msg
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ class PhysicalConstProjectNode : public PhysicalOpNode {
fn_infos_.push_back(&project_.fn_info());
}
virtual ~PhysicalConstProjectNode() {}
virtual void Print(std::ostream &output, const std::string &tab) const;
void Print(std::ostream &output, const std::string &tab) const override;
static PhysicalConstProjectNode *CastFrom(PhysicalOpNode *node);
const ColumnProjects &project() const { return project_; }

Expand Down
53 changes: 31 additions & 22 deletions hybridse/src/case/sql_case.cc
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,9 @@ const std::string SqlCase::case_name() const {
}
bool SqlCase::ExtractInputTableDef(type::TableDef& table,
int32_t input_idx) const {
if (inputs_.size() <= input_idx) {
return false;
}
return ExtractInputTableDef(inputs_[input_idx], table);
}
bool SqlCase::ExtractInputTableDef(const TableInfo& input,
Expand Down Expand Up @@ -1639,35 +1642,41 @@ void InitCases(std::string yaml_path, std::vector<SqlCase>& cases, // NOLINT
const std::vector<std::string>& filters) {
SqlCase::CreateSqlCasesFromYaml(hybridse::sqlcase::FindSqlCaseBaseDirPath(), yaml_path, cases, filters);
}
absl::StatusOr<std::string> SqlCase::BuildCreateSpSqlFromInput(int32_t input_idx,
absl::string_view select_sql,
const std::set<size_t>& common_idx) {
type::TableDef table;
if (!ExtractInputTableDef(table, input_idx)) {
return absl::FailedPreconditionError("Fail to extract table schema");
}
absl::StatusOr<std::string> SqlCase::BuildCreateSpSql(absl::string_view select_sql, const std::set<size_t>& common_idx,
std::optional<int32_t> input_idx) {
if (input_idx.has_value()) {
type::TableDef table;
if (!ExtractInputTableDef(table, input_idx.value())) {
return absl::FailedPreconditionError("Fail to extract table schema");
}

return BuildCreateSpSqlFromSchema(table, select_sql, common_idx);
return BuildCreateSpSql(select_sql, common_idx, &table);
}
std::optional<const type::TableDef*> tab = {};
return BuildCreateSpSql(select_sql, common_idx, tab);
}

absl::StatusOr<std::string> SqlCase::BuildCreateSpSqlFromSchema(const type::TableDef& table,
absl::string_view select_sql,
const std::set<size_t>& common_idx) {
auto sql_view = absl::StripAsciiWhitespace(select_sql);
std::string query_stmt(sql_view);
if (query_stmt.back() != ';') {
absl::StatusOr<std::string> SqlCase::BuildCreateSpSql(absl::string_view select_sql, const std::set<size_t>& common_idx,
std::optional<const type::TableDef*> tab) {
auto sql_view = absl::StripAsciiWhitespace(select_sql);
std::string query_stmt(sql_view);
if (query_stmt.back() != ';') {
absl::StrAppend(&query_stmt, ";");
}

std::string sql = absl::Substitute("CREATE PROCEDURE $0 (\n", sp_name_);
for (int i = 0; i < table.columns_size(); i++) {
auto column = table.columns(i);
if (!common_idx.empty() && common_idx.count(i)) {
absl::StrAppend(&sql, "const ");
}
absl::SubstituteAndAppend(&sql, "$0 $1", column.name(), TypeString(column.type()));
if (i < table.columns_size() - 1) {
absl::StrAppend(&sql, ",\n");
if (tab.has_value()) {
auto table = tab.value();

for (int i = 0; i < table->columns_size(); i++) {
auto column = table->columns(i);
if (!common_idx.empty() && common_idx.count(i)) {
absl::StrAppend(&sql, "const ");
}
absl::SubstituteAndAppend(&sql, "$0 $1", column.name(), TypeString(column.type()));
if (i < table->columns_size() - 1) {
absl::StrAppend(&sql, ",\n");
}
}
}
absl::SubstituteAndAppend(&sql, ")\nBEGIN\n$0\nEND;", query_stmt);
Expand Down
4 changes: 2 additions & 2 deletions hybridse/src/case/sql_case_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ TEST_F(SqlCaseTest, BuildCreateSpSqlFromInputTest) {
sql_case.sp_name_ = "sp";
std::string sql = " select c1, c2, c3, c4 from t1 ";
std::string sp_sql = "";
auto s = sql_case.BuildCreateSpSqlFromInput(0, sql, {});
auto s = sql_case.BuildCreateSpSql(sql, {}, 0);
ASSERT_TRUE(s.ok()) << s.status();
ASSERT_EQ(R"s(CREATE PROCEDURE sp (
c1 string,
Expand All @@ -1190,7 +1190,7 @@ END;)s",
sql_case.inputs_.push_back(input);
std::string sql = "select c1, c2, c3, c4 from t1;";
std::string sp_sql = "";
auto s = sql_case.BuildCreateSpSqlFromInput(0, sql, {0, 1, 3});
auto s = sql_case.BuildCreateSpSql(sql, {0, 1, 3}, 0);
ASSERT_TRUE(s.ok()) << s.status();
ASSERT_EQ(R"s(CREATE PROCEDURE sp1 (
const c1 string,
Expand Down
7 changes: 5 additions & 2 deletions hybridse/src/plan/planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ absl::StatusOr<node::TablePlanNode*> Planner::IsTable(node::PlanNode *node) {
// - SELECT
// - JOIN
// - WINDOW
// - CONST PROJECT
// - UnSupport Ops::
// - CREATE TABLE
// - INSERT TABLE
Expand All @@ -500,8 +501,10 @@ absl::StatusOr<node::TablePlanNode*> Planner::IsTable(node::PlanNode *node) {
// - Not Impl
// - Order By
base::Status Planner::ValidateOnlineServingOp(node::PlanNode *node) {
CHECK_TRUE(nullptr != node, common::kNullInputPointer,
"Fail to validate request table: input node is null")
if (node == nullptr) {
// null is fine, e.g the const project
return {};
}
switch (node->type_) {
case node::kPlanTypeProject: {
auto project_node = dynamic_cast<node::ProjectPlanNode *>(node);
Expand Down
14 changes: 13 additions & 1 deletion hybridse/src/testing/engine_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,20 @@ class RequestEngineTestRunner : public EngineTestRunner {
std::string request_db_name = request_session->GetRequestDbName();
CHECK_TRUE(parameter_rows_.empty(), common::kUnSupport, "Request do not support parameterized query currently")
Row parameter = parameter_rows_.empty() ? Row() : parameter_rows_[0];
if (request_rows_.empty()) {
// send empty request, trigger e.g const project in request mode
CHECK_TRUE(request_name.empty() && request_db_name.empty(), common::kUnsupportSql,
"no request data for request table: <", request_db_name, ".", request_name, ">")
request_rows_.push_back(Row());
}
for (auto in_row : request_rows_) {
Row out_row;
int run_ret = request_session->Run(in_row, &out_row);
if (run_ret != 0) {
return_code_ = ENGINE_TEST_RET_EXECUTION_ERROR;
return Status(common::kRunError, "Run request session failed");
}
if (!has_batch_request) {
if (!has_batch_request && !request_name.empty()) {
CHECK_TRUE(AddRowIntoTable(request_db_name, request_name, in_row), common::kTablePutFailed,
"Fail add row into table ", request_db_name, ".", request_name);
}
Expand Down Expand Up @@ -423,6 +429,12 @@ class BatchRequestEngineTestRunner : public EngineTestRunner {
offset += row_num;
}
}

if (request_rows_.empty()) {
// batch request rows will empty for const projects
// workaround by add the one empty row
request_rows_.push_back(Row());
}
return Status::OK();
}

Expand Down
7 changes: 3 additions & 4 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1074,11 +1074,10 @@ std::shared_ptr<DataHandlerList> Runner::BatchRequestRun(RunnerContext& ctx) {
return cached;
}
}
std::shared_ptr<DataHandlerVector> outputs =
std::make_shared<DataHandlerVector>();

std::shared_ptr<DataHandlerVector> outputs = std::make_shared<DataHandlerVector>();
std::vector<std::shared_ptr<DataHandler>> inputs(producers_.size());
std::vector<std::shared_ptr<DataHandlerList>> batch_inputs(
producers_.size());
std::vector<std::shared_ptr<DataHandlerList>> batch_inputs(producers_.size());
for (size_t idx = producers_.size(); idx > 0; idx--) {
batch_inputs[idx - 1] = producers_[idx - 1]->BatchRequestRun(ctx);
}
Expand Down
9 changes: 6 additions & 3 deletions hybridse/src/vm/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2300,7 +2300,9 @@ Status RequestModeTransformer::TransformProjectPlanOp(
"Input node or output node is null");

PhysicalOpNode* depend = nullptr;
CHECK_STATUS(TransformPlanOp(node->GetChildren()[0], &depend));
if (!node->GetChildren().empty() && nullptr != node->GetChildren()[0]) {
CHECK_STATUS(TransformPlanOp(node->GetChildren()[0], &depend));
}

CHECK_STATUS(CompleteProjectList(node, depend));

Expand Down Expand Up @@ -2468,8 +2470,6 @@ Status RequestModeTransformer::ValidateRequestTable(
return Status::OK();
}
case vm::kPhysicalOpConstProject: {
FAIL_STATUS(kPlanError,
"Non-support Const Project in request mode", in->GetTreeString());
break;
}
default: {
Expand All @@ -2487,6 +2487,9 @@ Status RequestModeTransformer::TransformProjectOp(
node::ProjectListNode* project_list, PhysicalOpNode* depend,
bool append_input, PhysicalOpNode** output) {
PhysicalOpNode* new_depend = depend;
if (nullptr == depend) {
return CreatePhysicalConstProjectNode(project_list, output);
}
if (nullptr != project_list->GetW()) {
CHECK_STATUS(TransformWindowOp(depend, project_list->GetW(), &new_depend));
}
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3740,12 +3740,16 @@ struct DeploymentEnv {
});
}

// A bacth request increase deployment cnt by 1
// yet may greatly impact deploy response time, if the batch size is huge
// maybe it requires a revision
void CallDeployProcedureBatch() {
hybridse::sdk::Status status;
std::shared_ptr<sdk::SQLRequestRow> rr = std::make_shared<sdk::SQLRequestRow>();
GetRequestRow(&rr, dp_name_);
auto common_column_indices = std::make_shared<sdk::ColumnIndicesSet>(rr->GetSchema());
auto common_column_indices = std::make_shared<sdk::ColumnIndicesSet>();
auto row_batch = std::make_shared<sdk::SQLRequestRowBatch>(rr->GetSchema(), common_column_indices);
ASSERT_TRUE(row_batch->AddRow(rr));
sr->CallSQLBatchRequestProcedure(db_, dp_name_, row_batch, &status);
ASSERT_TRUE(status.IsOK()) << status.msg << "\n" << status.trace;
}
Expand Down
8 changes: 4 additions & 4 deletions src/sdk/sql_cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, SqlSdkDistributeBatchRequestSinglePartitionT

TEST_P(SQLSDKBatchRequestQueryTest, SqlSdkDistributeBatchRequestProcedureTest) {
auto sql_case = GetParam();
if (!IsBatchRequestSupportMode(sql_case.mode())) {
if (!IsBatchRequestSupportMode(sql_case.mode()) || "procedure-unsupport" == sql_case.mode()) {
LOG(WARNING) << "Unsupport mode: " << sql_case.mode();
return;
}
Expand All @@ -1030,7 +1030,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, SqlSdkDistributeBatchRequestProcedureTest) {
TEST_P(SQLSDKQueryTest, SqlSdkDistributeRequestProcedureTest) {
auto sql_case = GetParam();
LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc();
if (!IsRequestSupportMode(sql_case.mode())) {
if (!IsRequestSupportMode(sql_case.mode()) || "procedure-unsupport" == sql_case.mode()) {
LOG(WARNING) << "Unsupport mode: " << sql_case.mode();
return;
}
Expand All @@ -1041,7 +1041,7 @@ TEST_P(SQLSDKQueryTest, SqlSdkDistributeRequestProcedureTest) {
}
TEST_P(SQLSDKBatchRequestQueryTest, SqlSdkDistributeBatchRequestProcedureAsyncTest) {
auto sql_case = GetParam();
if (!IsRequestSupportMode(sql_case.mode())) {
if (!IsBatchRequestSupportMode(sql_case.mode()) || "procedure-unsupport" == sql_case.mode()) {
LOG(WARNING) << "Unsupport mode: " << sql_case.mode();
return;
}
Expand All @@ -1059,7 +1059,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, SqlSdkDistributeBatchRequestProcedureAsyncTe
TEST_P(SQLSDKQueryTest, SqlSdkDistributeRequestProcedureAsyncTest) {
auto sql_case = GetParam();
LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc();
if (!IsRequestSupportMode(sql_case.mode())) {
if (!IsRequestSupportMode(sql_case.mode()) || "procedure-unsupport" == sql_case.mode()) {
LOG(WARNING) << "Unsupport mode: " << sql_case.mode();
return;
}
Expand Down
Loading

0 comments on commit 330d171

Please sign in to comment.