Skip to content

Commit

Permalink
feat(connector): filter by group by (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Nov 27, 2023
1 parent 5dbe27e commit 850becc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/streaming/clickhouse_connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace stri
From: params.From,
To: params.To,
Subject: params.Subject,
FilterGroupBy: params.FilterGroupBy,
GroupBySubject: params.GroupBySubject,
GroupBy: params.GroupBy,
WindowSize: params.WindowSize,
Expand Down
22 changes: 22 additions & 0 deletions internal/streaming/clickhouse_connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ type queryMeterView struct {
MeterSlug string
Aggregation models.MeterAggregation
Subject []string
FilterGroupBy map[string][]string
From *time.Time
To *time.Time
GroupBy []string
Expand Down Expand Up @@ -321,6 +322,27 @@ func (d queryMeterView) toSQL() (string, []interface{}, error) {
where = append(where, queryView.Or(slicesx.Map(d.Subject, mapFunc)...))
}

if len(d.FilterGroupBy) > 0 {
// We sort the columns to ensure the query is deterministic
columns := make([]string, 0, len(d.FilterGroupBy))
for k := range d.FilterGroupBy {
columns = append(columns, k)
}
sort.Strings(columns)

for _, column := range columns {
values := d.FilterGroupBy[column]
if len(values) == 0 {
return "", nil, fmt.Errorf("empty filter for group by: %s", column)
}
mapFunc := func(value string) string {
return queryView.Equal(sqlbuilder.Escape(column), value)
}

where = append(where, queryView.Or(slicesx.Map(values, mapFunc)...))
}
}

if d.From != nil {
where = append(where, queryView.GreaterEqualThan("windowstart", d.From.Unix()))
}
Expand Down
33 changes: 33 additions & 0 deletions internal/streaming/clickhouse_connector/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,39 @@ func TestQueryMeterView(t *testing.T) {
wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (subject = ? OR subject = ?) GROUP BY subject",
wantArgs: []interface{}{"subject1", "subject2"},
},
{ // Aggregate data with filtering for a single group and single value
query: queryMeterView{
Database: "openmeter",
Namespace: "my_namespace",
MeterSlug: "meter1",
Aggregation: models.MeterAggregationSum,
FilterGroupBy: map[string][]string{"g1": {"g1v1"}},
},
wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (g1 = ?)",
wantArgs: []interface{}{"g1v1"},
},
{ // Aggregate data with filtering for a single group and multiple values
query: queryMeterView{
Database: "openmeter",
Namespace: "my_namespace",
MeterSlug: "meter1",
Aggregation: models.MeterAggregationSum,
FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}},
},
wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (g1 = ? OR g1 = ?)",
wantArgs: []interface{}{"g1v1", "g1v2"},
},
{ // Aggregate data with filtering for multiple groups and multiple values
query: queryMeterView{
Database: "openmeter",
Namespace: "my_namespace",
MeterSlug: "meter1",
Aggregation: models.MeterAggregationSum,
FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}, "g2": {"g2v1", "g2v2"}},
},
wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (g1 = ? OR g1 = ?) AND (g2 = ? OR g2 = ?)",
wantArgs: []interface{}{"g1v1", "g1v2", "g2v1", "g2v2"},
},
}

for _, tt := range tests {
Expand Down
1 change: 1 addition & 0 deletions internal/streaming/query_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type QueryParams struct {
From *time.Time
To *time.Time
Subject []string
FilterGroupBy map[string][]string
GroupBySubject bool
GroupBy []string
Aggregation models.MeterAggregation
Expand Down

0 comments on commit 850becc

Please sign in to comment.