Skip to content

Commit

Permalink
feat: collect and serve pre-agg bytes and count
Browse files Browse the repository at this point in the history
* pre-aggregate bytes and count per stream in the pattern ingester
* serve bytes_over_time and count_over_time queries from the patterns
  endpoint
  • Loading branch information
trevorwhitney committed May 23, 2024
1 parent 97212ea commit dc620e7
Show file tree
Hide file tree
Showing 32 changed files with 2,921 additions and 308 deletions.
3 changes: 3 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ schema_config:
prefix: index_
period: 24h

pattern_ingester:
enabled: true

ruler:
alertmanager_url: http://localhost:9093

Expand Down
18 changes: 16 additions & 2 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (r *QueryPatternsResponse) UnmarshalJSON(data []byte) error {
var v struct {
Status string `json:"status"`
Data []struct {
Pattern string `json:"pattern"`
Pattern string `json:"pattern,omitempty"`
Labels string `json:"labels,omitempty"`
Samples [][]int64 `json:"samples"`
} `json:"data"`
}
Expand All @@ -174,7 +175,12 @@ func (r *QueryPatternsResponse) UnmarshalJSON(data []byte) error {
for _, s := range d.Samples {
samples = append(samples, &PatternSample{Timestamp: model.TimeFromUnix(s[0]), Value: s[1]})
}
r.Series = append(r.Series, &PatternSeries{Pattern: d.Pattern, Samples: samples})

if pattern := d.Pattern; pattern != "" {
r.Series = append(r.Series, NewPatternSeriesWithPattern(pattern, samples))
} else if labels := d.Labels; labels != "" {
r.Series = append(r.Series, NewPatternSeriesWithLabels(labels, samples))
}
}
return nil
}
Expand All @@ -188,3 +194,11 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) {
m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...)
m.Statistics.Merge(other.Statistics)
}

func NewPatternSeriesWithPattern(pattern string, samples []*PatternSample) *PatternSeries {
return &PatternSeries{Identifier: &PatternSeries_Pattern{pattern}, Samples: samples}
}

func NewPatternSeriesWithLabels(labels string, samples []*PatternSample) *PatternSeries {
return &PatternSeries{Identifier: &PatternSeries_Labels{labels}, Samples: samples}
}
69 changes: 69 additions & 0 deletions pkg/logproto/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logproto
import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -40,3 +41,71 @@ func TestShard_SpaceFor(t *testing.T) {
})
}
}

func TestQueryPatternsResponse_UnmarshalJSON(t *testing.T) {
t.Run("unmarshals patterns", func(t *testing.T) {
mockData := []byte(`{
"status": "success",
"data": [
{
"pattern": "foo <*> bar",
"samples": [[1609459200, 10], [1609545600, 15]]
},
{
"pattern": "foo <*> buzz",
"samples": [[1609459200, 20], [1609545600, 25]]
}
]
}`)

expectedSeries := []*PatternSeries{
NewPatternSeriesWithPattern("foo <*> bar", []*PatternSample{
{Timestamp: model.TimeFromUnix(1609459200), Value: 10},
{Timestamp: model.TimeFromUnix(1609545600), Value: 15},
}),
NewPatternSeriesWithPattern("foo <*> buzz", []*PatternSample{
{Timestamp: model.TimeFromUnix(1609459200), Value: 20},
{Timestamp: model.TimeFromUnix(1609545600), Value: 25},
}),
}

r := &QueryPatternsResponse{}
err := r.UnmarshalJSON(mockData)

require.Nil(t, err)
require.Equal(t, expectedSeries, r.Series)
})

t.Run("unmarshals labels", func(t *testing.T) {
mockData := []byte(`{
"status": "success",
"data": [
{
"labels": "{foo=\"bar\"}",
"samples": [[1609459200, 10], [1609545600, 15]]
},
{
"labels": "{foo=\"buzz\"}",
"samples": [[1609459200, 20], [1609545600, 25]]
}
]
}`)

expectedSeries := []*PatternSeries{
NewPatternSeriesWithLabels(`{foo="bar"}`, []*PatternSample{
{Timestamp: model.TimeFromUnix(1609459200), Value: 10},
{Timestamp: model.TimeFromUnix(1609545600), Value: 15},
}),
NewPatternSeriesWithLabels(`{foo="buzz"}`, []*PatternSample{
{Timestamp: model.TimeFromUnix(1609459200), Value: 20},
{Timestamp: model.TimeFromUnix(1609545600), Value: 25},
}),
}

r := &QueryPatternsResponse{}
err := r.UnmarshalJSON(mockData)

require.Nil(t, err)
require.Equal(t, expectedSeries, r.Series)
})
}
Loading

0 comments on commit dc620e7

Please sign in to comment.