Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(#3491): support window union multiple join in request mode #3493

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions cases/query/window_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,71 @@ cases:
200, 1, 1, 1
300, 0, 0, 0
400, 1, 0, 0

- id: 23
sql: |
select
gp_id,
count(gp_id) over w as cnt,
-- t2 matches and t3 not matches
count_where(gp_id, not is_null(lcond) and is_null(cond)) over w as feat1,
from (select id as gp_id, 0 as lcond, 0 as cond, cast(90000 as timestamp) as ts from request)
window w as (
union (select t1.gp_id, t2.cond as lcond, t3.cond as cond, t1.ts from
t1 last join t2 on t1.gp_id = t2.account
last join t3 on t1.cond = t3.cond)
partition by gp_id order by ts
rows between unbounded preceding and current row
exclude current_row instance_not_in_window
)
inputs:
- name: request
columns: ["id int"]
indexs: ['idx:id']
data: |
100
200
300
400
- name: t1
columns:
- gp_id int
- cond int
- ts timestamp
indexs:
- idx2:gp_id:ts
data: |
100, 201, 10000
100, 201, 10000
200, 203, 10000
400, 204, 10000
400, 205, 10000
- name: t2
columns:
- account int
- cond int
- ts timestamp
indexs: ["idx1:account:ts"]
data: |
100, 201, 1000
200, 203, 4000
400, 209, 4000
- name: t3
columns:
- cond int
- ts timestamp
indexs: ["idx3:cond:ts"]
data: |
201, 1000
208, 1000
expect:
columns:
- gp_id int
- cnt int64
- feat1 int64
order: gp_id
data: |
100, 2, 0
200, 1, 1
300, 0, 0
400, 2, 2
7 changes: 6 additions & 1 deletion hybridse/include/vm/schemas_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class SchemaSource {
size_t size() const;
void Clear();

std::string ToString() const;
std::string DebugString() const;
friend std::ostream& operator<<(std::ostream& os, const SchemaSource& sc) { return os << sc.DebugString(); }

private:
bool CheckSourceSetIndex(size_t idx) const;
Expand Down Expand Up @@ -246,6 +247,10 @@ class SchemasContext {
void BuildTrivial(const std::vector<const codec::Schema*>& schemas);
void BuildTrivial(const std::string& default_db, const std::vector<const type::TableDef*>& tables);

std::string DebugString() const;

friend std::ostream& operator<<(std::ostream& os, const SchemasContext& sc) { return os << sc.DebugString(); }

private:
bool IsColumnAmbiguous(const std::string& column_name) const;

Expand Down
13 changes: 9 additions & 4 deletions hybridse/src/passes/physical/group_and_sort_optimized.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@
return false;
}
PhysicalSimpleProjectNode* new_simple_op = nullptr;
Status status = plan_ctx_->CreateOp<PhysicalSimpleProjectNode>(
&new_simple_op, new_depend, simple_project->project());
Status status =
plan_ctx_->CreateOp<PhysicalSimpleProjectNode>(&new_simple_op, new_depend, simple_project->project());
if (!status.isOK()) {
LOG(WARNING) << "Fail to create simple project op: " << status;
return false;
Expand Down Expand Up @@ -442,11 +442,16 @@
index_key, right_key, sort, &new_depend)) {
return false;
}
if (!ResetProducer(plan_ctx_, request_join, 0, new_depend)) {
PhysicalRequestJoinNode* new_join = nullptr;
auto s = plan_ctx_->CreateOp<PhysicalRequestJoinNode>(&new_join, new_depend, request_join->GetProducer(1),
request_join->join(),
request_join->output_right_only());
if (!s.isOK()) {
LOG(WARNING) << "Fail to create new request join op: " << s;

Check warning on line 450 in hybridse/src/passes/physical/group_and_sort_optimized.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/passes/physical/group_and_sort_optimized.cc#L450

Added line #L450 was not covered by tests
return false;
}

*new_in = request_join;
*new_in = new_join;
return true;
}
return false;
Expand Down
1 change: 1 addition & 0 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static bool IsPartitionProvider(vm::PhysicalOpNode* n) {
switch (n->GetOpType()) {
case kPhysicalOpSimpleProject:
case kPhysicalOpRename:
case kPhysicalOpRequestJoin:
return IsPartitionProvider(n->GetProducer(0));
case kPhysicalOpDataProvider:
return dynamic_cast<vm::PhysicalDataProviderNode*>(n)->provider_type_ == kProviderTypePartition;
Expand Down
40 changes: 37 additions & 3 deletions hybridse/src/vm/schemas_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/

#include "vm/schemas_context.h"

#include <set>

#include "absl/strings/str_join.h"
#include "passes/physical/physical_pass.h"
#include "vm/physical_op.h"

Expand Down Expand Up @@ -121,14 +124,18 @@
return schema_ == nullptr ? 0 : schema_->size();
}

std::string SchemaSource::ToString() const {
// output: {db}.{table}[ {name}:{type}:{id}, ... ]
std::string SchemaSource::DebugString() const {

Check warning on line 128 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L128

Added line #L128 was not covered by tests
std::stringstream ss;
ss << source_db_ << "." << source_name_ << "[";

Check warning on line 130 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L130

Added line #L130 was not covered by tests
for (size_t i = 0; i < column_ids_.size(); ++i) {
ss << schema_->Get(i).name() << ":" << node::TypeName(schema_->Get(i).type()) << ":";

Check warning on line 132 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L132

Added line #L132 was not covered by tests
ss << "#" << std::to_string(column_ids_[i]);
if (i < column_ids_.size() - 1) {
ss << ", ";
}
}
ss << "]";

Check warning on line 138 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L138

Added line #L138 was not covered by tests
return ss.str();
}

Expand Down Expand Up @@ -173,7 +180,7 @@
db_name = source->GetSourceDB();
}
std::string rel_name = child->GetName();
if (rel_name.empty()&& !source->GetSourceName().empty()) {
if (rel_name.empty() && !source->GetSourceName().empty()) {
rel_name = source->GetSourceName();
}
new_source->SetSourceDBAndTableName(db_name, rel_name);
Expand Down Expand Up @@ -751,7 +758,34 @@
this->Build();
}

RowParser::RowParser(const SchemasContext* schema_ctx) : schema_ctx_(schema_ctx) {
std::string SchemasContext::DebugString() const {
std::stringstream ss;
ss << absl::StrCat("{", root_db_name_, ",", root_relation_name_, ",", default_db_name_, ", ",
absl::StrJoin(schema_sources_, ",", [](std::string* out, const SchemaSource* source) {
absl::StrAppend(out, source->DebugString());
}));

Check warning on line 766 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L761-L766

Added lines #L761 - L766 were not covered by tests
ss << ", id_map={"
<< absl::StrJoin(column_id_map_, ",", [](std::string* out, decltype(column_id_map_)::const_reference e) {
absl::StrAppend(out, e.first, "=(", e.second.first, ",", e.second.second, ")");
}) << "}, ";

Check warning on line 770 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L768-L770

Added lines #L768 - L770 were not covered by tests
ss << "name_map={"
<< absl::StrJoin(column_name_map_, ",",
[](std::string* out, decltype(column_name_map_)::const_reference e) {
absl::StrAppend(
out, e.first, "=[",
absl::StrJoin(e.second, ",",
[](std::string* out, decltype(e.second)::const_reference ref) {
absl::StrAppend(out, "(", ref.first, ",", ref.second, ")");
}),

Check warning on line 779 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L772-L779

Added lines #L772 - L779 were not covered by tests
"]");
})
<< "}";
ss << "}";
return ss.str();

Check warning on line 784 in hybridse/src/vm/schemas_context.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/schemas_context.cc#L781-L784

Added lines #L781 - L784 were not covered by tests
}

RowParser::RowParser(const SchemasContext* schema_ctx)
: schema_ctx_(schema_ctx) {
for (size_t i = 0; i < schema_ctx_->GetSchemaSourceSize(); ++i) {
auto source = schema_ctx_->GetSchemaSource(i);
row_view_list_.push_back(codec::RowView(*source->GetSchema()));
Expand Down
Loading