diff --git a/Makefile b/Makefile index ac63fae..a3aa433 100644 --- a/Makefile +++ b/Makefile @@ -30,8 +30,10 @@ doc: get: $(GOGET) -t -v ./... -test: get +fmt: $(GOFMT) ./... + +test: get fmt $(GOTEST) -count=1 -race -covermode=atomic ./... coverage: get test diff --git a/common.go b/common.go index 0553076..ea17d29 100644 --- a/common.go +++ b/common.go @@ -10,9 +10,18 @@ import ( //go:generate stringer -type=AggregationType type AggregationType string +//go:generate stringer -type=ReducerType +type ReducerType string + //go:generate stringer -type=DuplicatePolicyType type DuplicatePolicyType string +const ( + SumReducer ReducerType = "SUM" + MinReducer ReducerType = "MIN" + MaxReducer ReducerType = "MAX" +) + const ( AvgAggregation AggregationType = "AVG" SumAggregation AggregationType = "SUM" diff --git a/example_client_test.go b/example_client_test.go index 90d58dd..b915f97 100644 --- a/example_client_test.go +++ b/example_client_test.go @@ -304,6 +304,44 @@ func ExampleClient_RangeWithOptions() { // Datapoints: [{1 1} {2 2} {3 3} {4 4} {5 5} {6 6} {7 7} {8 8} {9 9}] } +// Exemplifies the usage of RangeWithOptions function, while changing the reference timestamp on which a bucket is defined. +// nolint:errcheck +func ExampleClient_RangeWithOptions_aggregationMax() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + for ts := 1; ts < 10; ts++ { + client.Add("ts-1", int64(ts), float64(ts)) + } + + datapoints, _ := client.RangeWithOptions("ts-1", 0, 1000, *redistimeseries.NewRangeOptions().SetAggregation(redistimeseries.MaxAggregation, 5)) + fmt.Printf("Datapoints: %v\n", datapoints) + // Output: + // Datapoints: [{0 4} {5 9}] +} + +// Exemplifies the usage of RangeWithOptions function, while changing the reference timestamp on which a bucket is defined. +// nolint:errcheck +func ExampleClient_RangeWithOptions_aggregationAlign() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + for ts := 1; ts < 10; ts++ { + client.Add("ts-1", int64(ts), float64(ts)) + } + + datapoints, _ := client.RangeWithOptions("ts-1", 0, 1000, *redistimeseries.NewRangeOptions().SetAggregation(redistimeseries.CountAggregation, 2).SetAlign(1)) + fmt.Printf("Datapoints: %v\n", datapoints) + // Output: + // Datapoints: [{1 2} {3 2} {5 2} {7 2} {9 1}] +} + // nolint // Exemplifies the usage of ReverseRangeWithOptions function func ExampleClient_ReverseRangeWithOptions() { @@ -323,6 +361,44 @@ func ExampleClient_ReverseRangeWithOptions() { // Datapoints: [{9 9} {8 8} {7 7} {6 6} {5 5} {4 4} {3 3} {2 2} {1 1}] } +// nolint +// Exemplifies the usage of ReverseRangeWithOptions function while filtering value +func ExampleClient_ReverseRangeWithOptions_filterByValue() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + for ts := 1; ts < 10; ts++ { + client.Add("ts-2", int64(ts), float64(ts)) + } + + datapoints, _ := client.ReverseRangeWithOptions("ts-2", 0, 1000, *redistimeseries.NewRangeOptions().SetFilterByValue(5, 50)) + fmt.Printf("Datapoints: %v\n", datapoints) + // Output: + // Datapoints: [{9 9} {8 8} {7 7} {6 6} {5 5}] +} + +// nolint +// Exemplifies the usage of ReverseRangeWithOptions function while filtering by timestamp +func ExampleClient_ReverseRangeWithOptions_filterByTs() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + for ts := 1; ts < 10; ts++ { + client.Add("ts-2", int64(ts), float64(ts)) + } + + datapoints, _ := client.ReverseRangeWithOptions("ts-2", 0, 1000, *redistimeseries.NewRangeOptions().SetFilterByTs([]int64{1, 2, 3, 4, 5})) + fmt.Printf("Datapoints: %v\n", datapoints) + // Output: + // Datapoints: [{5 5} {4 4} {3 3} {2 2} {1 1}] +} + // nolint // Exemplifies the usage of MultiRangeWithOptions function. func ExampleClient_MultiRangeWithOptions() { @@ -333,6 +409,9 @@ func ExampleClient_MultiRangeWithOptions() { }} client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + // ensure clean DB + client.FlushAll() + labels1 := map[string]string{ "machine": "machine-1", "az": "us-east-1", @@ -354,6 +433,161 @@ func ExampleClient_MultiRangeWithOptions() { // Ranges: [{time-serie-1 map[] [{2 1} {4 2}]} {time-serie-2 map[] [{1 5} {4 10}]}] } +// nolint +// Exemplifies the usage of MultiRangeWithOptions function. +// grouping multiple time-series +func ExampleClient_MultiRangeWithOptions_groupByReduce() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + // ensure clean DB + client.FlushAll() + + labels1 := map[string]string{ + "machine": "machine-1", + "az": "us-east-1", + "team": "team-1", + } + client.AddWithOptions("time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "az": "us-east-1", + "team": "team-2", + } + client.AddWithOptions("time-serie-2", 1, 5.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("time-serie-2", 4, 10.0) + + labels3 := map[string]string{ + "machine": "machine-3", + "az": "us-east-1", + "team": "team-2", + } + client.AddWithOptions("time-serie-3", 1, 55.0, redistimeseries.CreateOptions{Labels: labels3}) + client.Add("time-serie-3", 4, 99.0) + + // Find out the total resources usage by team + ranges, _ := client.MultiRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetWithLabels(true).SetGroupByReduce("team", redistimeseries.SumReducer), "az=us-east-1") + + fmt.Printf("Sum of usage by team: %v\n", ranges) + // Output: + // Sum of usage by team: [{team=team-1 map[__reducer__:sum __source__:time-serie-1 team:team-1] [{2 1} {4 2}]} {team=team-2 map[__reducer__:sum __source__:time-serie-2,time-serie-3 team:team-2] [{1 60} {4 109}]}] +} + +// Exemplifies the usage of MultiRangeWithOptions function, +// filtering the result by specific timestamps +// nolint:errcheck +func ExampleClient_MultiRangeWithOptions_filterByTs() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + // ensure clean DB + client.FlushAll() + + labels1 := map[string]string{ + "machine": "machine-1", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-2", 1, 5.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("time-serie-2", 4, 10.0) + + ranges, _ := client.MultiRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetFilterByTs([]int64{1, 2}), "az=us-east-1") + + fmt.Printf("Ranges: %v\n", ranges) + // Output: + // Ranges: [{time-serie-1 map[] [{2 1}]} {time-serie-2 map[] [{1 5}]}] +} + +// Exemplifies the usage of MultiRangeWithOptions function, +// filtering the result by value using minimum and maximum. +// nolint:errcheck +func ExampleClient_MultiRangeWithOptions_filterByValue() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + // ensure the DB is empty + client.FlushAll() + + labels1 := map[string]string{ + "machine": "machine-1", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-2", 1, 2.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("time-serie-2", 4, 10.0) + + ranges, _ := client.MultiRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetFilterByValue(1, 5), "az=us-east-1") + + fmt.Printf("Ranges: %v\n", ranges) + // Output: + // Ranges: [{time-serie-1 map[] [{2 1} {4 2}]} {time-serie-2 map[] [{1 2}]}] +} + +// Exemplifies the usage of MultiRangeWithOptions function, +// filtering the returned labels. +// nolint:errcheck +func ExampleClient_MultiRangeWithOptions_selectedLabels() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + // ensure the DB is empty + client.FlushAll() + + labels1 := map[string]string{ + "machine": "machine-1", + "team": "SF-1", + "location": "SF", + "az": "us-east-1", + } + client.AddWithOptions("selected-labels-ex-time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("selected-labels-ex-time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "team": "NY-1", + "location": "NY", + "az": "us-east-1", + } + client.AddWithOptions("selected-labels-ex-time-serie-2", 1, 10.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("selected-labels-ex-time-serie-2", 4, 15.0) + + ranges, _ := client.MultiRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetSelectedLabels([]string{"az", "location"}), "az=us-east-1") + + fmt.Printf("Ranges: %v\n", ranges) + // Output: + // Ranges: [{selected-labels-ex-time-serie-1 map[az:us-east-1 location:SF] [{2 1} {4 2}]} {selected-labels-ex-time-serie-2 map[az:us-east-1 location:NY] [{1 10} {4 15}]}] +} + // Exemplifies the usage of MultiReverseRangeWithOptions function. // nolint:errcheck func ExampleClient_MultiReverseRangeWithOptions() { @@ -364,6 +598,9 @@ func ExampleClient_MultiReverseRangeWithOptions() { }} client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + // ensure the DB is empty + client.FlushAll() + labels1 := map[string]string{ "machine": "machine-1", "az": "us-east-1", @@ -385,6 +622,112 @@ func ExampleClient_MultiReverseRangeWithOptions() { // Ranges: [{time-serie-1 map[] [{4 2} {2 1}]} {time-serie-2 map[] [{4 10} {1 5}]}] } +// Exemplifies the usage of MultiReverseRangeWithOptions function, +// filtering the result by specific timestamps +// nolint:errcheck +func ExampleClient_MultiReverseRangeWithOptions_filterByTs() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + labels1 := map[string]string{ + "machine": "machine-1", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-2", 1, 5.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("time-serie-2", 4, 10.0) + + ranges, _ := client.MultiReverseRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetFilterByTs([]int64{1, 2}), "az=us-east-1") + + fmt.Printf("Ranges: %v\n", ranges) + // Output: + // Ranges: [{time-serie-1 map[] [{2 1}]} {time-serie-2 map[] [{1 5}]}] +} + +// Exemplifies the usage of MultiReverseRangeWithOptions function, +// filtering the result by value using minimum and maximum. +// nolint:errcheck +func ExampleClient_MultiReverseRangeWithOptions_filterByValue() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + // ensure the DB is empty + client.FlushAll() + + labels1 := map[string]string{ + "machine": "machine-1", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "az": "us-east-1", + } + client.AddWithOptions("time-serie-2", 1, 2.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("time-serie-2", 4, 10.0) + + ranges, _ := client.MultiReverseRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetFilterByValue(1, 5), "az=us-east-1") + + fmt.Printf("Ranges: %v\n", ranges) + // Output: + // Ranges: [{time-serie-1 map[] [{4 2} {2 1}]} {time-serie-2 map[] [{1 2}]}] +} + +// Exemplifies the usage of MultiReverseRangeWithOptions function, +// filtering the returned labels. +// nolint:errcheck +func ExampleClient_MultiReverseRangeWithOptions_selectedLabels() { + host := "localhost:6379" + password := "" + pool := &redis.Pool{Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", host, redis.DialPassword(password)) + }} + client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + + // ensure the DB is empty + client.FlushAll() + + labels1 := map[string]string{ + "machine": "machine-1", + "team": "SF-1", + "location": "SF", + "az": "us-east-1", + } + client.AddWithOptions("selected-labels-ex-time-serie-1", 2, 1.0, redistimeseries.CreateOptions{Labels: labels1}) + client.Add("selected-labels-ex-time-serie-1", 4, 2.0) + + labels2 := map[string]string{ + "machine": "machine-2", + "team": "NY-1", + "location": "NY", + "az": "us-east-1", + } + client.AddWithOptions("selected-labels-ex-time-serie-2", 1, 10.0, redistimeseries.CreateOptions{Labels: labels2}) + client.Add("selected-labels-ex-time-serie-2", 4, 15.0) + + ranges, _ := client.MultiReverseRangeWithOptions(1, 10, *redistimeseries.NewMultiRangeOptions().SetSelectedLabels([]string{"az", "location"}), "az=us-east-1") + + fmt.Printf("Ranges: %v\n", ranges) + // Output: + // Ranges: [{selected-labels-ex-time-serie-1 map[az:us-east-1 location:SF] [{4 2} {2 1}]} {selected-labels-ex-time-serie-2 map[az:us-east-1 location:NY] [{4 15} {1 10}]}] +} + //nolint:errcheck // Exemplifies the usage of MultiGetWithOptions function while using the default MultiGetOptions and while using user defined MultiGetOptions. func ExampleClient_MultiGetWithOptions() { @@ -395,6 +738,9 @@ func ExampleClient_MultiGetWithOptions() { }} client := redistimeseries.NewClientFromPool(pool, "ts-client-1") + // ensure the DB is empty + client.FlushAll() + labels1 := map[string]string{ "machine": "machine-1", "az": "us-east-1", diff --git a/multirange.go b/multirange.go index 8d44184..3f175d0 100644 --- a/multirange.go +++ b/multirange.go @@ -1,32 +1,84 @@ package redis_timeseries_go -import "strconv" +import ( + "fmt" + "strconv" +) // MultiRangeOptions represent the options for querying across multiple time-series type MultiRangeOptions struct { - AggType AggregationType - TimeBucket int - Count int64 - WithLabels bool + AggType AggregationType + TimeBucket int + Count int64 + WithLabels bool + SelectedLabels []string + Align int64 + FilterByTs []int64 + FilterByValueMin *float64 + FilterByValueMax *float64 + GroupBy string + Reduce ReducerType } // MultiRangeOptions are the default options for querying across multiple time-series var DefaultMultiRangeOptions = MultiRangeOptions{ - AggType: "", - TimeBucket: -1, - Count: -1, - WithLabels: false, + AggType: "", + TimeBucket: -1, + Count: -1, + WithLabels: false, + SelectedLabels: []string{}, + Align: -1, + FilterByTs: []int64{}, + FilterByValueMin: nil, + FilterByValueMax: nil, + GroupBy: "", + Reduce: "", } func NewMultiRangeOptions() *MultiRangeOptions { return &MultiRangeOptions{ - AggType: "", - TimeBucket: -1, - Count: -1, - WithLabels: false, + AggType: "", + TimeBucket: -1, + Count: -1, + WithLabels: false, + SelectedLabels: []string{}, + Align: -1, + FilterByTs: []int64{}, + FilterByValueMin: nil, + FilterByValueMax: nil, + GroupBy: "", + Reduce: "", } } +// SetGroupByReduce Aggregates results across different time series, grouped by the provided label name. +// When combined with AGGREGATION the groupby/reduce is applied post aggregation stage. +func (mrangeopts *MultiRangeOptions) SetGroupByReduce(byLabel string, reducer ReducerType) *MultiRangeOptions { + mrangeopts.GroupBy = byLabel + mrangeopts.Reduce = reducer + return mrangeopts +} + +// SetAlign sets the time bucket alignment control for AGGREGATION. +// This will control the time bucket timestamps by changing the reference timestamp on which a bucket is defined. +func (mrangeopts *MultiRangeOptions) SetAlign(byTimeStamp int64) *MultiRangeOptions { + mrangeopts.Align = byTimeStamp + return mrangeopts +} + +// SetFilterByTs sets the list of timestamps to filter the result by specific timestamps +func (mrangeopts *MultiRangeOptions) SetFilterByTs(filterByTS []int64) *MultiRangeOptions { + mrangeopts.FilterByTs = filterByTS + return mrangeopts +} + +// SetFilterByValue filters the result by value using minimum and maximum ( inclusive ) +func (mrangeopts *MultiRangeOptions) SetFilterByValue(min, max float64) *MultiRangeOptions { + mrangeopts.FilterByValueMin = &min + mrangeopts.FilterByValueMax = &max + return mrangeopts +} + func (mrangeopts *MultiRangeOptions) SetCount(count int64) *MultiRangeOptions { mrangeopts.Count = count return mrangeopts @@ -43,8 +95,25 @@ func (mrangeopts *MultiRangeOptions) SetWithLabels(value bool) *MultiRangeOption return mrangeopts } +// SetSelectedLabels limits the series reply labels to provided label names +func (mrangeopts *MultiRangeOptions) SetSelectedLabels(labels []string) *MultiRangeOptions { + mrangeopts.SelectedLabels = labels + return mrangeopts +} + func createMultiRangeCmdArguments(fromTimestamp int64, toTimestamp int64, mrangeOptions MultiRangeOptions, filters []string) []interface{} { args := []interface{}{strconv.FormatInt(fromTimestamp, 10), strconv.FormatInt(toTimestamp, 10)} + if mrangeOptions.FilterByValueMin != nil { + args = append(args, "FILTER_BY_VALUE", + fmt.Sprintf("%f", *mrangeOptions.FilterByValueMin), + fmt.Sprintf("%f", *mrangeOptions.FilterByValueMax)) + } + if len(mrangeOptions.FilterByTs) > 0 { + args = append(args, "FILTER_BY_TS") + for _, timestamp := range mrangeOptions.FilterByTs { + args = append(args, strconv.FormatInt(timestamp, 10)) + } + } if mrangeOptions.AggType != "" { args = append(args, "AGGREGATION", mrangeOptions.AggType, strconv.Itoa(mrangeOptions.TimeBucket)) } @@ -53,10 +122,21 @@ func createMultiRangeCmdArguments(fromTimestamp int64, toTimestamp int64, mrange } if mrangeOptions.WithLabels { args = append(args, "WITHLABELS") + } else if len(mrangeOptions.SelectedLabels) > 0 { + args = append(args, "SELECTED_LABELS") + for _, label := range mrangeOptions.SelectedLabels { + args = append(args, label) + } + } + if mrangeOptions.Align != -1 { + args = append(args, "ALIGN", strconv.FormatInt(mrangeOptions.Align, 10)) } args = append(args, "FILTER") for _, filter := range filters { args = append(args, filter) } + if mrangeOptions.GroupBy != "" { + args = append(args, "GROUPBY", mrangeOptions.GroupBy, "REDUCE", string(mrangeOptions.Reduce)) + } return args } diff --git a/multirange_test.go b/multirange_test.go index ca2480f..97ebdbc 100644 --- a/multirange_test.go +++ b/multirange_test.go @@ -17,10 +17,49 @@ func TestCreateMultiRangeCmdArguments(t *testing.T) { args args want []interface{} }{ - {"default", args{0, 1, DefaultMultiRangeOptions, []string{"labels!="}}, []interface{}{"0", "1", "FILTER", "labels!="}}, - {"withlabels", args{0, 1, *(NewMultiRangeOptions().SetWithLabels(true)), []string{"labels!="}}, []interface{}{"0", "1", "WITHLABELS", "FILTER", "labels!="}}, - {"withlabels and aggregation", args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true)), []string{"labels!="}}, []interface{}{"0", "1", "AGGREGATION", AvgAggregation, "60", "WITHLABELS", "FILTER", "labels!="}}, - {"withlabels, aggregation and count", args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true).SetCount(120)), []string{"labels!="}}, []interface{}{"0", "1", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "WITHLABELS", "FILTER", "labels!="}}, + {"default", + args{0, 1, DefaultMultiRangeOptions, []string{"labels!="}}, + []interface{}{"0", "1", "FILTER", "labels!="}}, + {"withlabels", + args{0, 1, *(NewMultiRangeOptions().SetWithLabels(true)), + []string{"labels!="}}, + []interface{}{"0", "1", "WITHLABELS", "FILTER", "labels!="}}, + {"withlabels and aggregation", + args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true)), + []string{"labels!="}}, + []interface{}{"0", "1", "AGGREGATION", AvgAggregation, "60", "WITHLABELS", "FILTER", "labels!="}}, + {"withlabels, aggregation and count", + args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true).SetCount(120)), + []string{"labels!="}}, + []interface{}{"0", "1", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "WITHLABELS", "FILTER", "labels!="}}, + {"withlabels, aggregation, count, and align", + args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true).SetCount(120).SetAlign(10)), + []string{"labels!="}}, + []interface{}{"0", "1", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "WITHLABELS", "ALIGN", "10", "FILTER", "labels!="}}, + {"withlabels, aggregation, count, and align, filter by ts", + args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true).SetCount(120).SetAlign(10).SetFilterByTs([]int64{10, 11, 12, 13})), + []string{"labels!="}}, + []interface{}{"0", "1", "FILTER_BY_TS", "10", "11", "12", "13", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "WITHLABELS", "ALIGN", "10", "FILTER", "labels!="}}, + {"withlabels, aggregation, count, and align, filter by value", + args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetWithLabels(true).SetCount(120).SetAlign(10).SetFilterByValue(10, 13)), + []string{"labels!="}}, + []interface{}{"0", "1", "FILTER_BY_VALUE", "10.000000", "13.000000", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "WITHLABELS", "ALIGN", "10", "FILTER", "labels!="}}, + {"selected_labels, aggregation, count, and align, filter by value", + args{0, 1, *(NewMultiRangeOptions().SetAggregation(AvgAggregation, 60).SetSelectedLabels([]string{"l1", "l2"}).SetCount(120).SetAlign(10).SetFilterByValue(10, 13)), + []string{"labels!="}}, + []interface{}{"0", "1", "FILTER_BY_VALUE", "10.000000", "13.000000", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "SELECTED_LABELS", "l1", "l2", "ALIGN", "10", "FILTER", "labels!="}}, + {"groupby l2 reduce max", + args{0, 1, *NewMultiRangeOptions().SetGroupByReduce("l2", MaxReducer), + []string{"labels!="}}, + []interface{}{"0", "1", "FILTER", "labels!=", "GROUPBY", "l2", "REDUCE", "MAX"}}, + {"groupby l2 reduce min", + args{0, 1, *NewMultiRangeOptions().SetGroupByReduce("l2", MinReducer), + []string{"labels!="}}, + []interface{}{"0", "1", "FILTER", "labels!=", "GROUPBY", "l2", "REDUCE", "MIN"}}, + {"groupby l2 reduce sum", + args{0, 1, *NewMultiRangeOptions().SetGroupByReduce("l2", SumReducer), + []string{"labels!="}}, + []interface{}{"0", "1", "FILTER", "labels!=", "GROUPBY", "l2", "REDUCE", "SUM"}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/range.go b/range.go index e3c5f19..bc477eb 100644 --- a/range.go +++ b/range.go @@ -1,21 +1,30 @@ package redis_timeseries_go import ( + "fmt" "strconv" ) // MultiRangeOptions represent the options for querying across multiple time-series type RangeOptions struct { - AggType AggregationType - TimeBucket int - Count int64 + AggType AggregationType + TimeBucket int + Count int64 + Align int64 + FilterByTs []int64 + FilterByValueMin *float64 + FilterByValueMax *float64 } func NewRangeOptions() *RangeOptions { return &RangeOptions{ - AggType: "", - TimeBucket: -1, - Count: -1, + AggType: "", + TimeBucket: -1, + Count: -1, + Align: -1, + FilterByTs: []int64{}, + FilterByValueMin: nil, + FilterByValueMax: nil, } } @@ -27,6 +36,26 @@ func (rangeopts *RangeOptions) SetCount(count int64) *RangeOptions { return rangeopts } +// SetAlign sets the time bucket alignment control for AGGREGATION. +// This will control the time bucket timestamps by changing the reference timestamp on which a bucket is defined. +func (rangeopts *RangeOptions) SetAlign(byTimeStamp int64) *RangeOptions { + rangeopts.Align = byTimeStamp + return rangeopts +} + +// SetFilterByTs sets a list of timestamps to filter the result by specific timestamps +func (rangeopts *RangeOptions) SetFilterByTs(filterByTS []int64) *RangeOptions { + rangeopts.FilterByTs = filterByTS + return rangeopts +} + +// SetFilterByValue filters result by value using minimum and maximum ( inclusive ) +func (rangeopts *RangeOptions) SetFilterByValue(min, max float64) *RangeOptions { + rangeopts.FilterByValueMin = &min + rangeopts.FilterByValueMax = &max + return rangeopts +} + func (rangeopts *RangeOptions) SetAggregation(aggType AggregationType, timeBucket int) *RangeOptions { rangeopts.AggType = aggType rangeopts.TimeBucket = timeBucket @@ -35,11 +64,25 @@ func (rangeopts *RangeOptions) SetAggregation(aggType AggregationType, timeBucke func createRangeCmdArguments(key string, fromTimestamp int64, toTimestamp int64, rangeOptions RangeOptions) []interface{} { args := []interface{}{key, strconv.FormatInt(fromTimestamp, 10), strconv.FormatInt(toTimestamp, 10)} + if rangeOptions.FilterByValueMin != nil { + args = append(args, "FILTER_BY_VALUE", + fmt.Sprintf("%f", *rangeOptions.FilterByValueMin), + fmt.Sprintf("%f", *rangeOptions.FilterByValueMax)) + } + if len(rangeOptions.FilterByTs) > 0 { + args = append(args, "FILTER_BY_TS") + for _, timestamp := range rangeOptions.FilterByTs { + args = append(args, strconv.FormatInt(timestamp, 10)) + } + } if rangeOptions.AggType != "" { args = append(args, "AGGREGATION", rangeOptions.AggType, strconv.Itoa(rangeOptions.TimeBucket)) } if rangeOptions.Count != -1 { args = append(args, "COUNT", strconv.FormatInt(rangeOptions.Count, 10)) } + if rangeOptions.Align != -1 { + args = append(args, "ALIGN", strconv.FormatInt(rangeOptions.Align, 10)) + } return args } diff --git a/range_test.go b/range_test.go index 722a842..2fa1f19 100644 --- a/range_test.go +++ b/range_test.go @@ -18,8 +18,21 @@ func TestCreateRangeCmdArguments(t *testing.T) { want []interface{} }{ {"default", args{"key", 0, 1, DefaultRangeOptions}, []interface{}{"key", "0", "1"}}, - {"aggregation", args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60)}, []interface{}{"key", "0", "1", "AGGREGATION", AvgAggregation, "60"}}, - {"aggregation and count", args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60).SetCount(120)}, []interface{}{"key", "0", "1", "AGGREGATION", AvgAggregation, "60", "COUNT", "120"}}, + {"aggregation", + args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60)}, + []interface{}{"key", "0", "1", "AGGREGATION", AvgAggregation, "60"}}, + {"aggregation and count", + args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60).SetCount(120)}, + []interface{}{"key", "0", "1", "AGGREGATION", AvgAggregation, "60", "COUNT", "120"}}, + {"aggregation and align", + args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60).SetCount(120).SetAlign(4)}, + []interface{}{"key", "0", "1", "AGGREGATION", AvgAggregation, "60", "COUNT", "120", "ALIGN", "4"}}, + {"aggregation and filter by ts", + args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60).SetCount(120).SetFilterByTs([]int64{10, 5, 11})}, + []interface{}{"key", "0", "1", "FILTER_BY_TS", "10", "5", "11", "AGGREGATION", AvgAggregation, "60", "COUNT", "120"}}, + {"aggregation and filter by value", + args{"key", 0, 1, *NewRangeOptions().SetAggregation(AvgAggregation, 60).SetCount(120).SetFilterByValue(5.0, 55.0)}, + []interface{}{"key", "0", "1", "FILTER_BY_VALUE", "5.000000", "55.000000", "AGGREGATION", AvgAggregation, "60", "COUNT", "120"}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {