Skip to content

Commit

Permalink
fix: remove apply pass to the same physical node duplicately.
Browse files Browse the repository at this point in the history
  • Loading branch information
jingchen2222 committed Sep 23, 2021
1 parent 62b89c4 commit feb2d55
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 27 deletions.
2 changes: 0 additions & 2 deletions hybridse/examples/toydb/src/testing/toydb_engine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ TEST_P(EngineTest, TestRequestEngine) {
TEST_P(EngineTest, TestBatchEngine) {
ParamType sql_case = GetParam();
EngineOptions options;
options.set_enable_batch_window_parallelization(true);
options.set_performance_sensitive(false);
LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc();
if (!boost::contains(sql_case.mode(), "batch-unsupport") &&
!boost::contains(sql_case.mode(), "rtidb-unsupport") &&
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class PhysicalOpNode : public node::NodeBase<PhysicalOpNode> {

virtual void PrintSchema() const;

virtual std::string SchemaToString() const;
virtual std::string SchemaToString(const std::string &tab = "") const;

const std::vector<PhysicalOpNode *> &GetProducers() const {
return producers_;
Expand Down
10 changes: 9 additions & 1 deletion hybridse/src/passes/physical/transform_up_physical_pass.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ bool TransformUpPysicalPass::Apply(PhysicalOpNode* in, PhysicalOpNode** out) {
LOG(WARNING) << "fail to apply pass: input or output is null";
return false;
}
if (visited_ids_.find(in->GetNodeId()) != visited_ids_.end()) {
DLOG(INFO) << "Apply " << in->GetTypeName() << " node #" << in->GetNodeId() << " has been visited already.";
*out = in;
return visited_ids_[in->GetNodeId()];
}
auto producer = in->producers();
for (size_t j = 0; j < producer.size(); ++j) {
PhysicalOpNode* output = nullptr;
if (Apply(producer[j], &output)) {
if (!ResetProducer(plan_ctx_, in, j, output)) {
visited_ids_[in->GetNodeId()] = false;
return false;
}
}
Expand All @@ -40,10 +46,12 @@ bool TransformUpPysicalPass::Apply(PhysicalOpNode* in, PhysicalOpNode** out) {
Status status = in->InitSchema(plan_ctx_);
if (!status.isOK()) {
LOG(WARNING) << "Reset schema failed: " << status;
visited_ids_[in->GetNodeId()] = false;
return false;
}
in->FinishSchema();
return Transform(in, out);
visited_ids_[in->GetNodeId()] = Transform(in, out);
return visited_ids_[in->GetNodeId()];
}

bool ResetProducer(PhysicalPlanContext* plan_ctx, PhysicalOpNode* op,
Expand Down
1 change: 1 addition & 0 deletions hybridse/src/passes/physical/transform_up_physical_pass.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class TransformUpPysicalPass : public PhysicalPass {
node::NodeManager* node_manager_;
const std::string db_;
std::shared_ptr<Catalog> catalog_;
std::unordered_map<int, bool> visited_ids_;
};

} // namespace passes
Expand Down
3 changes: 2 additions & 1 deletion hybridse/src/planv2/ast_node_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ base::Status ConvertASTType(const zetasql::ASTType* ast_type, node::NodeManager*
}
return base::Status::OK();
}
/// Used to convert zetasql ASTExpression Node into our ExprNode
base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node::NodeManager* node_manager,
node::ExprNode** output) {
if (nullptr == ast_expression) {
*output = nullptr;
return base::Status::OK();
}
base::Status status;
// TODO(chenjing): support case when value and case when without value
switch (ast_expression->node_kind()) {
case zetasql::AST_STAR: {
*output = node_manager->MakeAllNode("");
Expand Down Expand Up @@ -359,6 +359,7 @@ base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node:
const zetasql::ASTParameterExpr* parameter_expr = ast_expression->GetAsOrNull<zetasql::ASTParameterExpr>();
CHECK_TRUE(nullptr != parameter_expr, common::kSqlAstError, "not an ASTParameterExpr")

// Only support anonymous parameter (e.g, ?) so far.
CHECK_TRUE(nullptr == parameter_expr->name(), common::kSqlAstError,
"Un-support Named Parameter Expression ", parameter_expr->name()->GetAsString());
*output = node_manager->MakeParameterExpr(parameter_expr->position());
Expand Down
12 changes: 6 additions & 6 deletions hybridse/src/vm/physical_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1081,9 +1081,9 @@ void PhysicalOpNode::PrintSchema() const {
std::cout << SchemaToString() << std::endl;
}

std::string PhysicalOpNode::SchemaToString() const {
std::string PhysicalOpNode::SchemaToString(const std::string& tab) const {
std::stringstream ss;
ss << "[";
ss << tab << "[";
if (!schemas_ctx_.GetName().empty()) {
ss << "name=" << schemas_ctx_.GetName() << ", ";
}
Expand All @@ -1093,11 +1093,11 @@ std::string PhysicalOpNode::SchemaToString() const {
ss << "]\n";

for (size_t i = 0; i < GetOutputSchemaSourceSize(); ++i) {
ss << "{\n";
ss << tab << "{\n";
const SchemaSource* schema_source = GetOutputSchemaSource(i);
const auto* schema = schema_source->GetSchema();
for (int32_t j = 0; j < schema->size(); j++) {
ss << " ";
ss << tab << " ";
const type::ColumnDef& column = schema->Get(j);
ss << "#" << schema_source->GetColumnID(j) << " " << column.name()
<< " " << type::Type_Name(column.type());
Expand All @@ -1107,7 +1107,7 @@ std::string PhysicalOpNode::SchemaToString() const {
}
ss << "\n";
}
ss << "} ";
ss << tab << "} ";
}
return ss.str();
}
Expand Down Expand Up @@ -1237,7 +1237,7 @@ base::Status PhysicalRequestUnionNode::InitSchema(PhysicalPlanContext* ctx) {
}

base::Status PhysicalRenameNode::InitSchema(PhysicalPlanContext* ctx) {
CHECK_TRUE(!producers_.empty(), common::kPlanError, "Empty request union");
CHECK_TRUE(!producers_.empty(), common::kPlanError, "Empty procedures");
schemas_ctx_.Clear();
schemas_ctx_.SetName(name_);
schemas_ctx_.Merge(0, producers_[0]->schemas_ctx());
Expand Down
15 changes: 2 additions & 13 deletions hybridse/src/vm/schemas_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,9 @@ Status SchemasContext::ResolveColumnIndexByID(size_t column_id,
size_t* index) const {
CHECK_TRUE(this->CheckBuild(), kColumnNotFound,
"Schemas context is not fully build");
std::stringstream trace_msg;
auto find_iter = column_id_map_.find(column_id);
for(auto iter = column_id_map_.begin(); iter != column_id_map_.end(); iter++) {
trace_msg << "(#" << iter->first << ": " << iter->second.first << ", " << iter->second.second << "),";
}
trace_msg << "\ncolumn_name_map_:";
for(auto iter = column_name_map_.begin(); iter != column_name_map_.end(); iter++) {
trace_msg << iter->first << ", ";
}
trace_msg << "\n";

CHECK_TRUE(find_iter != column_id_map_.end(), kColumnNotFound,
"Fail to find column id #", column_id,
" in current schema context", trace_msg.str());
CHECK_TRUE(find_iter != column_id_map_.end(), kColumnNotFound, "Fail to find column id #", column_id,
" in current schema context")
*schema_idx = find_iter->second.first;
*index = find_iter->second.second;
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions hybridse/src/vm/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1202,12 +1202,12 @@ void BatchModeTransformer::ApplyPasses(PhysicalOpNode* node,
}
}

std::string BatchModeTransformer::ExtractSchameName(PhysicalOpNode* in) {
std::string BatchModeTransformer::ExtractSchemaName(PhysicalOpNode* in) {
if (nullptr == in) {
return "";
}
if (kPhysicalOpSimpleProject == in->GetOpType()) {
return ExtractSchameName(in->GetProducer(0));
return ExtractSchemaName(in->GetProducer(0));
} else if (kPhysicalOpRename == in->GetOpType()) {
return dynamic_cast<PhysicalRenameNode*>(in)->name_;
} else if (kPhysicalOpDataProvider == in->GetOpType()) {
Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/vm/transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class BatchModeTransformer {
bool isSourceFromTable(PhysicalOpNode* in);
Status ValidateTableProvider(PhysicalOpNode* physical_plan);
Status ValidatePartitionDataProvider(PhysicalOpNode* physical_plan);
std::string ExtractSchameName(PhysicalOpNode* physical_plan);
std::string ExtractSchemaName(PhysicalOpNode* in);
Status ValidateRequestDataProvider(PhysicalOpNode* physical_plan);
Status ValidateWindowIndexOptimization(const WindowOp& window,
PhysicalOpNode* in);
Expand Down

0 comments on commit feb2d55

Please sign in to comment.