From 850becce529a707eb3c6b286a5c069b0f35008a2 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Mon, 27 Nov 2023 12:37:17 -0800 Subject: [PATCH] feat(connector): filter by group by (#447) --- .../clickhouse_connector/connector.go | 1 + .../streaming/clickhouse_connector/query.go | 22 +++++++++++++ .../clickhouse_connector/query_test.go | 33 +++++++++++++++++++ internal/streaming/query_params.go | 1 + 4 files changed, 57 insertions(+) diff --git a/internal/streaming/clickhouse_connector/connector.go b/internal/streaming/clickhouse_connector/connector.go index 73c199bb1..722272b0f 100644 --- a/internal/streaming/clickhouse_connector/connector.go +++ b/internal/streaming/clickhouse_connector/connector.go @@ -286,6 +286,7 @@ func (c *ClickhouseConnector) queryMeterView(ctx context.Context, namespace stri From: params.From, To: params.To, Subject: params.Subject, + FilterGroupBy: params.FilterGroupBy, GroupBySubject: params.GroupBySubject, GroupBy: params.GroupBy, WindowSize: params.WindowSize, diff --git a/internal/streaming/clickhouse_connector/query.go b/internal/streaming/clickhouse_connector/query.go index ad08324db..487566826 100644 --- a/internal/streaming/clickhouse_connector/query.go +++ b/internal/streaming/clickhouse_connector/query.go @@ -224,6 +224,7 @@ type queryMeterView struct { MeterSlug string Aggregation models.MeterAggregation Subject []string + FilterGroupBy map[string][]string From *time.Time To *time.Time GroupBy []string @@ -321,6 +322,27 @@ func (d queryMeterView) toSQL() (string, []interface{}, error) { where = append(where, queryView.Or(slicesx.Map(d.Subject, mapFunc)...)) } + if len(d.FilterGroupBy) > 0 { + // We sort the columns to ensure the query is deterministic + columns := make([]string, 0, len(d.FilterGroupBy)) + for k := range d.FilterGroupBy { + columns = append(columns, k) + } + sort.Strings(columns) + + for _, column := range columns { + values := d.FilterGroupBy[column] + if len(values) == 0 { + return "", nil, fmt.Errorf("empty filter for group by: %s", column) + } + mapFunc := func(value string) string { + return queryView.Equal(sqlbuilder.Escape(column), value) + } + + where = append(where, queryView.Or(slicesx.Map(values, mapFunc)...)) + } + } + if d.From != nil { where = append(where, queryView.GreaterEqualThan("windowstart", d.From.Unix())) } diff --git a/internal/streaming/clickhouse_connector/query_test.go b/internal/streaming/clickhouse_connector/query_test.go index cc992e370..3e3f808d7 100644 --- a/internal/streaming/clickhouse_connector/query_test.go +++ b/internal/streaming/clickhouse_connector/query_test.go @@ -297,6 +297,39 @@ func TestQueryMeterView(t *testing.T) { wantSQL: "SELECT min(windowstart), max(windowend), subject, sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (subject = ? OR subject = ?) GROUP BY subject", wantArgs: []interface{}{"subject1", "subject2"}, }, + { // Aggregate data with filtering for a single group and single value + query: queryMeterView{ + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + FilterGroupBy: map[string][]string{"g1": {"g1v1"}}, + }, + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (g1 = ?)", + wantArgs: []interface{}{"g1v1"}, + }, + { // Aggregate data with filtering for a single group and multiple values + query: queryMeterView{ + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}}, + }, + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (g1 = ? OR g1 = ?)", + wantArgs: []interface{}{"g1v1", "g1v2"}, + }, + { // Aggregate data with filtering for multiple groups and multiple values + query: queryMeterView{ + Database: "openmeter", + Namespace: "my_namespace", + MeterSlug: "meter1", + Aggregation: models.MeterAggregationSum, + FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}, "g2": {"g2v1", "g2v2"}}, + }, + wantSQL: "SELECT min(windowstart), max(windowend), sumMerge(value) AS value FROM openmeter.om_my_namespace_meter1 WHERE (g1 = ? OR g1 = ?) AND (g2 = ? OR g2 = ?)", + wantArgs: []interface{}{"g1v1", "g1v2", "g2v1", "g2v2"}, + }, } for _, tt := range tests { diff --git a/internal/streaming/query_params.go b/internal/streaming/query_params.go index 433c0e611..15f890652 100644 --- a/internal/streaming/query_params.go +++ b/internal/streaming/query_params.go @@ -12,6 +12,7 @@ type QueryParams struct { From *time.Time To *time.Time Subject []string + FilterGroupBy map[string][]string GroupBySubject bool GroupBy []string Aggregation models.MeterAggregation