From dc620e7882ccb09cacfd1e231b92d2e4fe19470b Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 8 May 2024 14:08:44 -0600 Subject: [PATCH] feat: collect and serve pre-agg bytes and count * pre-aggregate bytes and count per stream in the pattern ingester * serve bytes_over_time and count_over_time queries from the patterns endpoint --- cmd/loki/loki-local-config.yaml | 3 + pkg/logproto/extensions.go | 18 +- pkg/logproto/extensions_test.go | 69 +++++ pkg/logproto/pattern.pb.go | 316 +++++++++++++++++++---- pkg/logproto/pattern.proto | 5 +- pkg/logql/evaluator.go | 24 +- pkg/logql/range_vector.go | 49 ++-- pkg/pattern/chunk/util.go | 14 ++ pkg/pattern/drain/chunk.go | 32 ++- pkg/pattern/drain/chunk_test.go | 32 ++- pkg/pattern/flush_test.go | 4 +- pkg/pattern/ingester.go | 74 +++++- pkg/pattern/ingester_querier.go | 51 +++- pkg/pattern/ingester_querier_test.go | 6 +- pkg/pattern/ingester_test.go | 265 +++++++++++++++++-- pkg/pattern/instance.go | 57 ++++- pkg/pattern/instance_test.go | 115 +++++++++ pkg/pattern/iter/batch.go | 46 +++- pkg/pattern/iter/batch_test.go | 20 +- pkg/pattern/iter/iterator.go | 176 ++++++++++++- pkg/pattern/iter/iterator_test.go | 135 +++++++--- pkg/pattern/iter/merge.go | 14 +- pkg/pattern/iter/merge_test.go | 188 +++++++++----- pkg/pattern/iter/query_client.go | 18 +- pkg/pattern/metric/chunk.go | 201 +++++++++++++++ pkg/pattern/metric/chunk_test.go | 329 ++++++++++++++++++++++++ pkg/pattern/metric/evaluator.go | 354 ++++++++++++++++++++++++++ pkg/pattern/metric/evaluator_test.go | 363 +++++++++++++++++++++++++++ pkg/pattern/stream.go | 155 ++++++++++++ pkg/pattern/stream_test.go | 4 +- pkg/util/marshal/marshal.go | 10 +- pkg/util/marshal/marshal_test.go | 82 ++++-- 32 files changed, 2921 insertions(+), 308 deletions(-) create mode 100644 pkg/pattern/chunk/util.go create mode 100644 pkg/pattern/instance_test.go create mode 100644 pkg/pattern/metric/chunk.go create mode 100644 pkg/pattern/metric/chunk_test.go create mode 100644 pkg/pattern/metric/evaluator.go create mode 100644 pkg/pattern/metric/evaluator_test.go diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 03b579647753a..5f717a3d6a81c 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -33,6 +33,9 @@ schema_config: prefix: index_ period: 24h +pattern_ingester: + enabled: true + ruler: alertmanager_url: http://localhost:9093 diff --git a/pkg/logproto/extensions.go b/pkg/logproto/extensions.go index 3de5c0fd75801..bd726bbb0d358 100644 --- a/pkg/logproto/extensions.go +++ b/pkg/logproto/extensions.go @@ -161,7 +161,8 @@ func (r *QueryPatternsResponse) UnmarshalJSON(data []byte) error { var v struct { Status string `json:"status"` Data []struct { - Pattern string `json:"pattern"` + Pattern string `json:"pattern,omitempty"` + Labels string `json:"labels,omitempty"` Samples [][]int64 `json:"samples"` } `json:"data"` } @@ -174,7 +175,12 @@ func (r *QueryPatternsResponse) UnmarshalJSON(data []byte) error { for _, s := range d.Samples { samples = append(samples, &PatternSample{Timestamp: model.TimeFromUnix(s[0]), Value: s[1]}) } - r.Series = append(r.Series, &PatternSeries{Pattern: d.Pattern, Samples: samples}) + + if pattern := d.Pattern; pattern != "" { + r.Series = append(r.Series, NewPatternSeriesWithPattern(pattern, samples)) + } else if labels := d.Labels; labels != "" { + r.Series = append(r.Series, NewPatternSeriesWithLabels(labels, samples)) + } } return nil } @@ -188,3 +194,11 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) { m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...) m.Statistics.Merge(other.Statistics) } + +func NewPatternSeriesWithPattern(pattern string, samples []*PatternSample) *PatternSeries { + return &PatternSeries{Identifier: &PatternSeries_Pattern{pattern}, Samples: samples} +} + +func NewPatternSeriesWithLabels(labels string, samples []*PatternSample) *PatternSeries { + return &PatternSeries{Identifier: &PatternSeries_Labels{labels}, Samples: samples} +} diff --git a/pkg/logproto/extensions_test.go b/pkg/logproto/extensions_test.go index d1c96c76bbed3..de8e84e34cd35 100644 --- a/pkg/logproto/extensions_test.go +++ b/pkg/logproto/extensions_test.go @@ -3,6 +3,7 @@ package logproto import ( "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -40,3 +41,71 @@ func TestShard_SpaceFor(t *testing.T) { }) } } + +func TestQueryPatternsResponse_UnmarshalJSON(t *testing.T) { + t.Run("unmarshals patterns", func(t *testing.T) { + mockData := []byte(`{ + "status": "success", + "data": [ + { + "pattern": "foo <*> bar", + "samples": [[1609459200, 10], [1609545600, 15]] + }, + { + "pattern": "foo <*> buzz", + "samples": [[1609459200, 20], [1609545600, 25]] + } + ] + }`) + + expectedSeries := []*PatternSeries{ + NewPatternSeriesWithPattern("foo <*> bar", []*PatternSample{ + {Timestamp: model.TimeFromUnix(1609459200), Value: 10}, + {Timestamp: model.TimeFromUnix(1609545600), Value: 15}, + }), + NewPatternSeriesWithPattern("foo <*> buzz", []*PatternSample{ + {Timestamp: model.TimeFromUnix(1609459200), Value: 20}, + {Timestamp: model.TimeFromUnix(1609545600), Value: 25}, + }), + } + + r := &QueryPatternsResponse{} + err := r.UnmarshalJSON(mockData) + + require.Nil(t, err) + require.Equal(t, expectedSeries, r.Series) + }) + + t.Run("unmarshals labels", func(t *testing.T) { + mockData := []byte(`{ + "status": "success", + "data": [ + { + "labels": "{foo=\"bar\"}", + "samples": [[1609459200, 10], [1609545600, 15]] + }, + { + "labels": "{foo=\"buzz\"}", + "samples": [[1609459200, 20], [1609545600, 25]] + } + ] + }`) + + expectedSeries := []*PatternSeries{ + NewPatternSeriesWithLabels(`{foo="bar"}`, []*PatternSample{ + {Timestamp: model.TimeFromUnix(1609459200), Value: 10}, + {Timestamp: model.TimeFromUnix(1609545600), Value: 15}, + }), + NewPatternSeriesWithLabels(`{foo="buzz"}`, []*PatternSample{ + {Timestamp: model.TimeFromUnix(1609459200), Value: 20}, + {Timestamp: model.TimeFromUnix(1609545600), Value: 25}, + }), + } + + r := &QueryPatternsResponse{} + err := r.UnmarshalJSON(mockData) + + require.Nil(t, err) + require.Equal(t, expectedSeries, r.Series) + }) +} diff --git a/pkg/logproto/pattern.pb.go b/pkg/logproto/pattern.pb.go index a666a32850127..facf0b4bfa907 100644 --- a/pkg/logproto/pattern.pb.go +++ b/pkg/logproto/pattern.pb.go @@ -146,8 +146,11 @@ func (m *QueryPatternsResponse) GetSeries() []*PatternSeries { } type PatternSeries struct { - Pattern string `protobuf:"bytes,1,opt,name=pattern,proto3" json:"pattern,omitempty"` - Samples []*PatternSample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples,omitempty"` + // Types that are valid to be assigned to Identifier: + // *PatternSeries_Pattern + // *PatternSeries_Labels + Identifier isPatternSeries_Identifier `protobuf_oneof:"identifier"` + Samples []*PatternSample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples,omitempty"` } func (m *PatternSeries) Reset() { *m = PatternSeries{} } @@ -182,9 +185,40 @@ func (m *PatternSeries) XXX_DiscardUnknown() { var xxx_messageInfo_PatternSeries proto.InternalMessageInfo -func (m *PatternSeries) GetPattern() string { +type isPatternSeries_Identifier interface { + isPatternSeries_Identifier() + Equal(interface{}) bool + MarshalTo([]byte) (int, error) + Size() int +} + +type PatternSeries_Pattern struct { + Pattern string `protobuf:"bytes,1,opt,name=pattern,proto3,oneof"` +} +type PatternSeries_Labels struct { + Labels string `protobuf:"bytes,3,opt,name=labels,proto3,oneof"` +} + +func (*PatternSeries_Pattern) isPatternSeries_Identifier() {} +func (*PatternSeries_Labels) isPatternSeries_Identifier() {} + +func (m *PatternSeries) GetIdentifier() isPatternSeries_Identifier { if m != nil { - return m.Pattern + return m.Identifier + } + return nil +} + +func (m *PatternSeries) GetPattern() string { + if x, ok := m.GetIdentifier().(*PatternSeries_Pattern); ok { + return x.Pattern + } + return "" +} + +func (m *PatternSeries) GetLabels() string { + if x, ok := m.GetIdentifier().(*PatternSeries_Labels); ok { + return x.Labels } return "" } @@ -196,6 +230,14 @@ func (m *PatternSeries) GetSamples() []*PatternSample { return nil } +// XXX_OneofWrappers is for the internal use of the proto package. +func (*PatternSeries) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*PatternSeries_Pattern)(nil), + (*PatternSeries_Labels)(nil), + } +} + type PatternSample struct { Timestamp github_com_prometheus_common_model.Time `protobuf:"varint,1,opt,name=timestamp,proto3,customtype=github.com/prometheus/common/model.Time" json:"timestamp"` Value int64 `protobuf:"varint,2,opt,name=value,proto3" json:"value,omitempty"` @@ -250,38 +292,39 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/pattern.proto", fileDescriptor_aaf4192acc66a4ea) } var fileDescriptor_aaf4192acc66a4ea = []byte{ - // 483 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xb1, 0x6e, 0xd3, 0x40, - 0x18, 0xf6, 0xd5, 0x49, 0xd3, 0x5e, 0xc5, 0x72, 0xa4, 0x60, 0x19, 0xe9, 0x1c, 0x79, 0x21, 0x93, - 0x0f, 0x52, 0x09, 0x24, 0xc6, 0x4c, 0x0c, 0x20, 0x15, 0xc3, 0x84, 0x60, 0x70, 0xda, 0xbf, 0xb6, - 0x55, 0xdb, 0xe7, 0xfa, 0xee, 0x2a, 0xb1, 0xf1, 0x08, 0x79, 0x0c, 0x1e, 0x80, 0x87, 0xe8, 0x98, - 0xb1, 0x62, 0x28, 0xc4, 0x59, 0x18, 0xfb, 0x08, 0xc8, 0x77, 0x76, 0x93, 0x56, 0x74, 0xe8, 0x92, - 0xdc, 0xff, 0x7f, 0xdf, 0xff, 0xf9, 0xbb, 0xff, 0x3b, 0xec, 0x96, 0xa7, 0x31, 0xcb, 0x78, 0x5c, - 0x56, 0x5c, 0x72, 0x56, 0x46, 0x52, 0x42, 0x55, 0x04, 0xba, 0x22, 0x3b, 0x5d, 0xdf, 0x1d, 0xc6, - 0x3c, 0xe6, 0x86, 0xd2, 0x9c, 0x0c, 0xee, 0x7a, 0x31, 0xe7, 0x71, 0x06, 0x4c, 0x57, 0x33, 0x75, - 0xc2, 0x64, 0x9a, 0x83, 0x90, 0x51, 0x5e, 0xb6, 0x84, 0x67, 0xb7, 0xc4, 0xbb, 0x43, 0x0b, 0x3e, - 0x6e, 0xc0, 0x52, 0x89, 0x44, 0xff, 0x98, 0xa6, 0xff, 0x13, 0xe1, 0xe1, 0x07, 0x05, 0xd5, 0xb7, - 0x43, 0xe3, 0x44, 0x84, 0x70, 0xa6, 0x40, 0x48, 0x32, 0xc4, 0xfd, 0xb3, 0xa6, 0xef, 0xa0, 0x11, - 0x1a, 0xef, 0x86, 0xa6, 0x20, 0x6f, 0x70, 0x5f, 0xc8, 0xa8, 0x92, 0xce, 0xd6, 0x08, 0x8d, 0xf7, - 0x26, 0x6e, 0x60, 0x1c, 0x05, 0x9d, 0xa3, 0xe0, 0x53, 0xe7, 0x68, 0xba, 0x73, 0x71, 0xe5, 0x59, - 0xf3, 0xdf, 0x1e, 0x0a, 0xcd, 0x08, 0x79, 0x85, 0x6d, 0x28, 0x8e, 0x1d, 0xfb, 0x01, 0x93, 0xcd, - 0x00, 0x21, 0xb8, 0x27, 0x24, 0x94, 0x4e, 0x6f, 0x84, 0xc6, 0x76, 0xa8, 0xcf, 0xfe, 0x5b, 0xbc, - 0x7f, 0xc7, 0xb5, 0x28, 0x79, 0x21, 0x80, 0x30, 0xbc, 0x2d, 0xa0, 0x4a, 0x41, 0x38, 0x68, 0x64, - 0x8f, 0xf7, 0x26, 0x4f, 0x83, 0x9b, 0x2d, 0xb4, 0xdc, 0x8f, 0x1a, 0x0e, 0x5b, 0x9a, 0xff, 0x05, - 0x3f, 0xba, 0x05, 0x10, 0x07, 0x0f, 0xda, 0x54, 0xda, 0xab, 0x77, 0x25, 0x79, 0x89, 0x07, 0x22, - 0xca, 0xcb, 0x0c, 0x84, 0xb3, 0x75, 0x9f, 0xb8, 0xc6, 0xc3, 0x8e, 0xe7, 0xcb, 0xb5, 0xba, 0xee, - 0x90, 0xf7, 0x78, 0xf7, 0x26, 0x34, 0xad, 0x6f, 0x4f, 0x59, 0x73, 0xdd, 0x5f, 0x57, 0xde, 0xf3, - 0x38, 0x95, 0x89, 0x9a, 0x05, 0x47, 0x3c, 0x6f, 0x12, 0xce, 0x41, 0x26, 0xa0, 0x04, 0x3b, 0xe2, - 0x79, 0xce, 0x0b, 0x96, 0xf3, 0x63, 0xc8, 0xf4, 0x92, 0xc2, 0xb5, 0x42, 0x93, 0xd2, 0x79, 0x94, - 0x29, 0xd0, 0x79, 0xd8, 0xa1, 0x29, 0x26, 0x73, 0x84, 0x07, 0xed, 0x67, 0xc9, 0x6b, 0xdc, 0x3b, - 0x54, 0x22, 0x21, 0xfb, 0x1b, 0x5e, 0x95, 0x48, 0xda, 0x98, 0xdd, 0x27, 0x77, 0xdb, 0x66, 0x8f, - 0xbe, 0x45, 0xde, 0xe1, 0xbe, 0x5e, 0x31, 0xa1, 0x6b, 0xca, 0xff, 0x5e, 0x8a, 0xeb, 0xdd, 0x8b, - 0x77, 0x5a, 0x2f, 0xd0, 0xf4, 0xeb, 0x62, 0x49, 0xad, 0xcb, 0x25, 0xb5, 0xae, 0x97, 0x14, 0x7d, - 0xaf, 0x29, 0xfa, 0x51, 0x53, 0x74, 0x51, 0x53, 0xb4, 0xa8, 0x29, 0xfa, 0x53, 0x53, 0xf4, 0xb7, - 0xa6, 0xd6, 0x75, 0x4d, 0xd1, 0x7c, 0x45, 0xad, 0xc5, 0x8a, 0x5a, 0x97, 0x2b, 0x6a, 0x7d, 0xde, - 0x5c, 0x49, 0x5c, 0x45, 0x27, 0x51, 0x11, 0xb1, 0x8c, 0x9f, 0xa6, 0xec, 0xfc, 0x80, 0x6d, 0x3e, - 0xf5, 0xd9, 0xb6, 0xfe, 0x3b, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x4f, 0x5c, 0x50, 0x5e, - 0x03, 0x00, 0x00, + // 511 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xbd, 0x6e, 0xd4, 0x40, + 0x10, 0xf6, 0xc6, 0xf7, 0x93, 0x6c, 0xa0, 0x59, 0x2e, 0x60, 0x19, 0x69, 0x7d, 0x72, 0xc3, 0x55, + 0x5e, 0xb8, 0x48, 0x20, 0x51, 0x5e, 0x95, 0x02, 0xa4, 0x60, 0xa8, 0x90, 0x28, 0x7c, 0xb9, 0x39, + 0xdb, 0x8a, 0xed, 0x75, 0xbc, 0xeb, 0x48, 0x74, 0x54, 0xd4, 0xf7, 0x18, 0x3c, 0x00, 0x0f, 0x91, + 0xf2, 0xca, 0x88, 0x22, 0x70, 0xbe, 0x86, 0x32, 0x8f, 0x80, 0xbc, 0x6b, 0xe7, 0x2e, 0x11, 0x29, + 0xd2, 0xd8, 0x33, 0xf3, 0x7d, 0x33, 0xfb, 0xed, 0xcc, 0x2c, 0xb6, 0xf3, 0xd3, 0x90, 0x25, 0x3c, + 0xcc, 0x0b, 0x2e, 0x39, 0xcb, 0x03, 0x29, 0xa1, 0xc8, 0x3c, 0xe5, 0x91, 0xdd, 0x36, 0x6e, 0x0f, + 0x42, 0x1e, 0x72, 0x4d, 0xa9, 0x2d, 0x8d, 0xdb, 0x4e, 0xc8, 0x79, 0x98, 0x00, 0x53, 0xde, 0xb4, + 0x9c, 0x33, 0x19, 0xa7, 0x20, 0x64, 0x90, 0xe6, 0x0d, 0xe1, 0xf9, 0xad, 0xe2, 0xad, 0xd1, 0x80, + 0x4f, 0x6a, 0x30, 0x2f, 0x45, 0xa4, 0x3e, 0x3a, 0xe8, 0xfe, 0x44, 0x78, 0xf0, 0xa1, 0x84, 0xe2, + 0xeb, 0xb1, 0x56, 0x22, 0x7c, 0x38, 0x2b, 0x41, 0x48, 0x32, 0xc0, 0xdd, 0xb3, 0x3a, 0x6e, 0xa1, + 0x21, 0x1a, 0xed, 0xf9, 0xda, 0x21, 0x6f, 0x71, 0x57, 0xc8, 0xa0, 0x90, 0xd6, 0xce, 0x10, 0x8d, + 0xf6, 0xc7, 0xb6, 0xa7, 0x15, 0x79, 0xad, 0x22, 0xef, 0x53, 0xab, 0x68, 0xb2, 0x7b, 0x71, 0xe5, + 0x18, 0x8b, 0xdf, 0x0e, 0xf2, 0x75, 0x0a, 0x79, 0x8d, 0x4d, 0xc8, 0x66, 0x96, 0xf9, 0x80, 0xcc, + 0x3a, 0x81, 0x10, 0xdc, 0x11, 0x12, 0x72, 0xab, 0x33, 0x44, 0x23, 0xd3, 0x57, 0xb6, 0x7b, 0x84, + 0x0f, 0xee, 0xa8, 0x16, 0x39, 0xcf, 0x04, 0x10, 0x86, 0x7b, 0x02, 0x8a, 0x18, 0x84, 0x85, 0x86, + 0xe6, 0x68, 0x7f, 0xfc, 0xcc, 0xbb, 0xe9, 0x42, 0xc3, 0xfd, 0xa8, 0x60, 0xbf, 0xa1, 0xb9, 0xdf, + 0x11, 0x7e, 0x7c, 0x0b, 0x21, 0x36, 0xee, 0x37, 0x63, 0xd1, 0x77, 0x3f, 0x32, 0xfc, 0x36, 0x40, + 0x2c, 0xdc, 0x4b, 0x82, 0x29, 0x24, 0x42, 0x5d, 0xa3, 0x86, 0x1a, 0x9f, 0xbc, 0xc2, 0x7d, 0x11, + 0xa4, 0x79, 0x02, 0xc2, 0xda, 0xb9, 0xef, 0x64, 0x85, 0xfb, 0x2d, 0x6f, 0xf2, 0x08, 0xe3, 0x78, + 0x06, 0x99, 0x8c, 0xe7, 0x31, 0x14, 0xae, 0xdc, 0xe8, 0x50, 0x38, 0x79, 0x8f, 0xf7, 0x6e, 0xe6, + 0xab, 0x94, 0x98, 0x13, 0x56, 0x77, 0xe6, 0xd7, 0x95, 0xf3, 0x22, 0x8c, 0x65, 0x54, 0x4e, 0xbd, + 0x13, 0x9e, 0xd6, 0xcb, 0x90, 0x82, 0x8c, 0xa0, 0x14, 0xec, 0x84, 0xa7, 0x29, 0xcf, 0x58, 0xca, + 0x67, 0x90, 0xa8, 0x7e, 0xfa, 0x9b, 0x0a, 0xf5, 0x40, 0xcf, 0x83, 0xa4, 0x04, 0x35, 0x3a, 0xd3, + 0xd7, 0xce, 0x78, 0x81, 0x70, 0xbf, 0x39, 0x96, 0xbc, 0xc1, 0x9d, 0xe3, 0x52, 0x44, 0xe4, 0x60, + 0x4b, 0x79, 0x29, 0xa2, 0x66, 0x23, 0xec, 0xa7, 0x77, 0xc3, 0xba, 0xe5, 0xae, 0x41, 0xde, 0xe1, + 0xae, 0x9a, 0x06, 0xa1, 0x1b, 0xca, 0xff, 0x96, 0xca, 0x76, 0xee, 0xc5, 0xdb, 0x5a, 0x2f, 0xd1, + 0xe4, 0xcb, 0x72, 0x45, 0x8d, 0xcb, 0x15, 0x35, 0xae, 0x57, 0x14, 0x7d, 0xab, 0x28, 0xfa, 0x51, + 0x51, 0x74, 0x51, 0x51, 0xb4, 0xac, 0x28, 0xfa, 0x53, 0x51, 0xf4, 0xb7, 0xa2, 0xc6, 0x75, 0x45, + 0xd1, 0x62, 0x4d, 0x8d, 0xe5, 0x9a, 0x1a, 0x97, 0x6b, 0x6a, 0x7c, 0xde, 0x6e, 0x49, 0x58, 0x04, + 0xf3, 0x20, 0x0b, 0x58, 0xc2, 0x4f, 0x63, 0x76, 0x7e, 0xc8, 0xb6, 0x5f, 0xc5, 0xb4, 0xa7, 0x7e, + 0x87, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x1c, 0x3a, 0x0e, 0x65, 0x89, 0x03, 0x00, 0x00, } func (this *QueryPatternsRequest) Equal(that interface{}) bool { @@ -365,7 +408,13 @@ func (this *PatternSeries) Equal(that interface{}) bool { } else if this == nil { return false } - if this.Pattern != that1.Pattern { + if that1.Identifier == nil { + if this.Identifier != nil { + return false + } + } else if this.Identifier == nil { + return false + } else if !this.Identifier.Equal(that1.Identifier) { return false } if len(this.Samples) != len(that1.Samples) { @@ -378,6 +427,54 @@ func (this *PatternSeries) Equal(that interface{}) bool { } return true } +func (this *PatternSeries_Pattern) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PatternSeries_Pattern) + if !ok { + that2, ok := that.(PatternSeries_Pattern) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Pattern != that1.Pattern { + return false + } + return true +} +func (this *PatternSeries_Labels) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PatternSeries_Labels) + if !ok { + that2, ok := that.(PatternSeries_Labels) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Labels != that1.Labels { + return false + } + return true +} func (this *PatternSample) Equal(that interface{}) bool { if that == nil { return this == nil @@ -434,15 +531,33 @@ func (this *PatternSeries) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&logproto.PatternSeries{") - s = append(s, "Pattern: "+fmt.Sprintf("%#v", this.Pattern)+",\n") + if this.Identifier != nil { + s = append(s, "Identifier: "+fmt.Sprintf("%#v", this.Identifier)+",\n") + } if this.Samples != nil { s = append(s, "Samples: "+fmt.Sprintf("%#v", this.Samples)+",\n") } s = append(s, "}") return strings.Join(s, "") } +func (this *PatternSeries_Pattern) GoString() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&logproto.PatternSeries_Pattern{` + + `Pattern:` + fmt.Sprintf("%#v", this.Pattern) + `}`}, ", ") + return s +} +func (this *PatternSeries_Labels) GoString() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&logproto.PatternSeries_Labels{` + + `Labels:` + fmt.Sprintf("%#v", this.Labels) + `}`}, ", ") + return s +} func (this *PatternSample) GoString() string { if this == nil { return "nil" @@ -715,6 +830,15 @@ func (m *PatternSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Identifier != nil { + { + size := m.Identifier.Size() + i -= size + if _, err := m.Identifier.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } if len(m.Samples) > 0 { for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { { @@ -729,16 +853,35 @@ func (m *PatternSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x12 } } - if len(m.Pattern) > 0 { - i -= len(m.Pattern) - copy(dAtA[i:], m.Pattern) - i = encodeVarintPattern(dAtA, i, uint64(len(m.Pattern))) - i-- - dAtA[i] = 0xa - } return len(dAtA) - i, nil } +func (m *PatternSeries_Pattern) MarshalTo(dAtA []byte) (int, error) { + return m.MarshalToSizedBuffer(dAtA[:m.Size()]) +} + +func (m *PatternSeries_Pattern) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Pattern) + copy(dAtA[i:], m.Pattern) + i = encodeVarintPattern(dAtA, i, uint64(len(m.Pattern))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *PatternSeries_Labels) MarshalTo(dAtA []byte) (int, error) { + return m.MarshalToSizedBuffer(dAtA[:m.Size()]) +} + +func (m *PatternSeries_Labels) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Labels) + copy(dAtA[i:], m.Labels) + i = encodeVarintPattern(dAtA, i, uint64(len(m.Labels))) + i-- + dAtA[i] = 0x1a + return len(dAtA) - i, nil +} func (m *PatternSample) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -824,9 +967,8 @@ func (m *PatternSeries) Size() (n int) { } var l int _ = l - l = len(m.Pattern) - if l > 0 { - n += 1 + l + sovPattern(uint64(l)) + if m.Identifier != nil { + n += m.Identifier.Size() } if len(m.Samples) > 0 { for _, e := range m.Samples { @@ -837,6 +979,26 @@ func (m *PatternSeries) Size() (n int) { return n } +func (m *PatternSeries_Pattern) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Pattern) + n += 1 + l + sovPattern(uint64(l)) + return n +} +func (m *PatternSeries_Labels) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Labels) + n += 1 + l + sovPattern(uint64(l)) + return n +} func (m *PatternSample) Size() (n int) { if m == nil { return 0 @@ -896,12 +1058,32 @@ func (this *PatternSeries) String() string { } repeatedStringForSamples += "}" s := strings.Join([]string{`&PatternSeries{`, - `Pattern:` + fmt.Sprintf("%v", this.Pattern) + `,`, + `Identifier:` + fmt.Sprintf("%v", this.Identifier) + `,`, `Samples:` + repeatedStringForSamples + `,`, `}`, }, "") return s } +func (this *PatternSeries_Pattern) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PatternSeries_Pattern{`, + `Pattern:` + fmt.Sprintf("%v", this.Pattern) + `,`, + `}`, + }, "") + return s +} +func (this *PatternSeries_Labels) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PatternSeries_Labels{`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `}`, + }, "") + return s +} func (this *PatternSample) String() string { if this == nil { return "nil" @@ -1237,7 +1419,7 @@ func (m *PatternSeries) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Pattern = string(dAtA[iNdEx:postIndex]) + m.Identifier = &PatternSeries_Pattern{string(dAtA[iNdEx:postIndex])} iNdEx = postIndex case 2: if wireType != 2 { @@ -1273,6 +1455,38 @@ func (m *PatternSeries) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPattern + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPattern + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthPattern + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Identifier = &PatternSeries_Labels{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPattern(dAtA[iNdEx:]) diff --git a/pkg/logproto/pattern.proto b/pkg/logproto/pattern.proto index e92a201b3a8b1..fa03742b1af75 100644 --- a/pkg/logproto/pattern.proto +++ b/pkg/logproto/pattern.proto @@ -32,7 +32,10 @@ message QueryPatternsResponse { } message PatternSeries { - string pattern = 1; + oneof identifier { + string pattern = 1; + string labels = 3; + } repeated PatternSample samples = 2; } diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index e50d8739c30ad..b44b134c8bdde 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -410,6 +410,20 @@ type VectorAggEvaluator struct { lb *labels.Builder } +func NewVectorAggEvaluator( + nextEvaluator StepEvaluator, + expr *syntax.VectorAggregationExpr, + buf []byte, + lb *labels.Builder, +) *VectorAggEvaluator { + return &VectorAggEvaluator{ + nextEvaluator: nextEvaluator, + expr: expr, + buf: buf, + lb: lb, + } +} + func (e *VectorAggEvaluator) Next() (bool, int64, StepResult) { next, ts, r := e.nextEvaluator.Next() @@ -684,9 +698,7 @@ func newRangeAggEvaluator( return nil, err } - return &RangeVectorEvaluator{ - iter: iter, - }, nil + return NewRangeVectorEvaluator(iter), nil } } @@ -696,6 +708,12 @@ type RangeVectorEvaluator struct { err error } +func NewRangeVectorEvaluator(iter RangeVectorIterator) *RangeVectorEvaluator { + return &RangeVectorEvaluator{ + iter: iter, + } +} + func (r *RangeVectorEvaluator) Next() (bool, int64, StepResult) { next := r.iter.Next() if !next { diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 44a8651577549..200f1480add7e 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -75,21 +75,18 @@ func newRangeVectorIterator( if err != nil { return nil, err } - return &batchRangeVectorIterator{ - iter: it, - step: step, - end: end, - selRange: selRange, - metrics: map[string]labels.Labels{}, - window: map[string]*promql.Series{}, - agg: vectorAggregator, - current: start - step, // first loop iteration will set it to start - offset: offset, - }, nil -} - -//batch - + return NewBatchRangeVectorIterator( + it, + selRange, + step, + start, + end, + offset, + vectorAggregator, + ), nil +} + +// batch type batchRangeVectorIterator struct { iter iter.PeekingSampleIterator selRange, step, end, current, offset int64 @@ -99,14 +96,32 @@ type batchRangeVectorIterator struct { agg BatchRangeVectorAggregator } +func NewBatchRangeVectorIterator( + it iter.PeekingSampleIterator, + selRange, step, start, end, offset int64, + agg BatchRangeVectorAggregator, +) RangeVectorIterator { + return &batchRangeVectorIterator{ + iter: it, + selRange: selRange, + step: step, + end: end, + current: start - step, // first loop iteration will set it to start + offset: offset, + metrics: map[string]labels.Labels{}, + window: map[string]*promql.Series{}, + agg: agg, + } +} + func (r *batchRangeVectorIterator) Next() bool { // slides the range window to the next position - r.current = r.current + r.step + r.current = r.current + r.step // first current will be 5 min before start if r.current > r.end { return false } rangeEnd := r.current - rangeStart := rangeEnd - r.selRange + rangeStart := rangeEnd - r.selRange // in nanoseconds // load samples r.popBack(rangeStart) r.load(rangeStart, rangeEnd) diff --git a/pkg/pattern/chunk/util.go b/pkg/pattern/chunk/util.go new file mode 100644 index 0000000000000..8cbde3fb0474b --- /dev/null +++ b/pkg/pattern/chunk/util.go @@ -0,0 +1,14 @@ +package chunk + +import ( + "time" + + "github.com/prometheus/common/model" +) + +const ( + TimeResolution = model.Time(int64(time.Second*10) / 1e6) + MaxChunkTime = 1 * time.Hour +) + +func TruncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step } diff --git a/pkg/pattern/drain/chunk.go b/pkg/pattern/drain/chunk.go index 9b1e34e2e3a19..8be0dd64070f8 100644 --- a/pkg/pattern/drain/chunk.go +++ b/pkg/pattern/drain/chunk.go @@ -7,15 +7,12 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/chunk" "github.com/grafana/loki/v3/pkg/pattern/iter" ) const ( - TimeResolution = model.Time(int64(time.Second*10) / 1e6) - defaultVolumeSize = 500 - - maxChunkTime = 1 * time.Hour ) type Chunks []Chunk @@ -25,7 +22,7 @@ type Chunk struct { } func newChunk(ts model.Time) Chunk { - maxSize := int(maxChunkTime.Nanoseconds()/TimeResolution.UnixNano()) + 1 + maxSize := int(chunk.MaxChunkTime.Nanoseconds()/chunk.TimeResolution.UnixNano()) + 1 v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize)} v.Samples[0] = logproto.PatternSample{ Timestamp: ts, @@ -39,11 +36,11 @@ func (c Chunk) spaceFor(ts model.Time) bool { return true } - return ts.Sub(c.Samples[0].Timestamp) < maxChunkTime + return ts.Sub(c.Samples[0].Timestamp) < chunk.MaxChunkTime } // ForRange returns samples with only the values -// in the given range [start:end) and aggregates them by step duration. +// in the given range [start:end] and aggregates them by step duration. // start and end are in milliseconds since epoch. step is a duration in milliseconds. func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { if len(c.Samples) == 0 { @@ -51,7 +48,7 @@ func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { } first := c.Samples[0].Timestamp last := c.Samples[len(c.Samples)-1].Timestamp - if start >= end || first >= end || last < start { + if start >= end || first > end || last < start { return nil } var lo int @@ -61,17 +58,18 @@ func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { }) } hi := len(c.Samples) - if end < last { + + if end <= last { hi = sort.Search(len(c.Samples), func(i int) bool { - return c.Samples[i].Timestamp >= end + return c.Samples[i].Timestamp > end }) } - if step == TimeResolution { + if step == chunk.TimeResolution { return c.Samples[lo:hi] } // Re-scale samples into step-sized buckets - currentStep := truncateTimestamp(c.Samples[lo].Timestamp, step) + currentStep := chunk.TruncateTimestamp(c.Samples[lo].Timestamp, step) aggregatedSamples := make([]logproto.PatternSample, 0, ((c.Samples[hi-1].Timestamp-currentStep)/step)+1) aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{ Timestamp: currentStep, @@ -79,7 +77,7 @@ func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { }) for _, sample := range c.Samples[lo:hi] { if sample.Timestamp >= currentStep+step { - stepForSample := truncateTimestamp(sample.Timestamp, step) + stepForSample := chunk.TruncateTimestamp(sample.Timestamp, step) for i := currentStep + step; i <= stepForSample; i += step { aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{ Timestamp: i, @@ -95,7 +93,7 @@ func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { } func (c *Chunks) Add(ts model.Time) { - t := truncateTimestamp(ts, TimeResolution) + t := chunk.TruncateTimestamp(ts, chunk.TimeResolution) if len(*c) == 0 { *c = append(*c, newChunk(t)) @@ -123,9 +121,9 @@ func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.It if len(samples) == 0 { continue } - iters = append(iters, iter.NewSlice(pattern, samples)) + iters = append(iters, iter.NewPatternSlice(pattern, samples)) } - return iter.NewNonOverlappingIterator(pattern, iters) + return iter.NewNonOverlappingPatternIterator(pattern, iters) } func (c Chunks) samples() []*logproto.PatternSample { @@ -197,5 +195,3 @@ func (c *Chunks) size() int { } return size } - -func truncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step } diff --git a/pkg/pattern/drain/chunk_test.go b/pkg/pattern/drain/chunk_test.go index 4863a6629729a..e404a9b5da779 100644 --- a/pkg/pattern/drain/chunk_test.go +++ b/pkg/pattern/drain/chunk_test.go @@ -9,28 +9,29 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/chunk" ) func TestAdd(t *testing.T) { cks := Chunks{} - cks.Add(TimeResolution + 1) - cks.Add(TimeResolution + 2) - cks.Add(2*TimeResolution + 1) + cks.Add(chunk.TimeResolution + 1) + cks.Add(chunk.TimeResolution + 2) + cks.Add(2*chunk.TimeResolution + 1) require.Equal(t, 1, len(cks)) require.Equal(t, 2, len(cks[0].Samples)) - cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1) + cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + chunk.TimeResolution + 1) require.Equal(t, 2, len(cks)) require.Equal(t, 1, len(cks[1].Samples)) } func TestIterator(t *testing.T) { cks := Chunks{} - cks.Add(TimeResolution + 1) - cks.Add(TimeResolution + 2) - cks.Add(2*TimeResolution + 1) - cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1) + cks.Add(chunk.TimeResolution + 1) + cks.Add(chunk.TimeResolution + 2) + cks.Add(2*chunk.TimeResolution + 1) + cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + chunk.TimeResolution + 1) - it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), TimeResolution) + it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), chunk.TimeResolution) require.NotNil(t, it) var samples []logproto.PatternSample @@ -137,7 +138,7 @@ func TestForRange(t *testing.T) { }, }, { - name: "Start and End Before First Element", + name: "Start before First and End Inclusive of First Element", c: &Chunk{Samples: []logproto.PatternSample{ {Timestamp: 2, Value: 2}, {Timestamp: 4, Value: 4}, @@ -145,6 +146,17 @@ func TestForRange(t *testing.T) { }}, start: 0, end: 2, + expected: []logproto.PatternSample{{Timestamp: 2, Value: 2}}, + }, + { + name: "Start and End before First Element", + c: &Chunk{Samples: []logproto.PatternSample{ + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, + }}, + start: 0, + end: 1, expected: nil, }, { diff --git a/pkg/pattern/flush_test.go b/pkg/pattern/flush_test.go index 9ee4bd436992b..d2ed2384522d3 100644 --- a/pkg/pattern/flush_test.go +++ b/pkg/pattern/flush_test.go @@ -62,7 +62,7 @@ func TestSweepInstance(t *testing.T) { End: time.Unix(0, math.MaxInt64), }) require.NoError(t, err) - res, err := iter.ReadAll(it) + res, err := iter.ReadAllWithPatterns(it) require.NoError(t, err) require.Equal(t, 2, len(res.Series)) ing.sweepUsers(true, true) @@ -72,7 +72,7 @@ func TestSweepInstance(t *testing.T) { End: time.Unix(0, math.MaxInt64), }) require.NoError(t, err) - res, err = iter.ReadAll(it) + res, err = iter.ReadAllWithPatterns(it) require.NoError(t, err) require.Equal(t, 1, len(res.Series)) } diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 1cb91a1cda299..e6174faffb6c0 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" @@ -21,6 +22,7 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/clientpool" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/util" @@ -238,17 +240,77 @@ func (i *Ingester) Query(req *logproto.QueryPatternsRequest, stream logproto.Pat if err != nil { return err } - iterator, err := instance.Iterator(ctx, req) - if err != nil { - return err + + expr, err := syntax.ParseExpr(req.Query) + + switch e := expr.(type) { + case syntax.SampleExpr: + var err error + iterator, err := instance.QuerySample(ctx, e, req) // this is returning a first value of 0,0 + if err != nil { + return err + } + + // TODO(twhitney): query store + // if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { + // storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ + // Start: start, + // End: end, + // Selector: req.Selector, + // Shards: req.Shards, + // Deletes: req.Deletes, + // Plan: req.Plan, + // }} + // storeItr, err := i.store.SelectSamples(ctx, storeReq) + // if err != nil { + // util.LogErrorWithContext(ctx, "closing iterator", it.Close) + // return err + // } + + // it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr}) + // } + + defer util.LogErrorWithContext(ctx, "closing iterator", iterator.Close) + return sendMetricsSample(ctx, iterator, stream) + case syntax.LogSelectorExpr: + var err error + iterator, err := instance.Iterator(ctx, req) + if err != nil { + return err + } + defer util.LogErrorWithContext(ctx, "closing iterator", iterator.Close) + return sendPatternSample(ctx, iterator, stream) + default: + return httpgrpc.Errorf( + http.StatusBadRequest, + fmt.Sprintf("unexpected type (%T): cannot evaluate", e), + ) } - defer util.LogErrorWithContext(ctx, "closing iterator", iterator.Close) - return sendPatternSample(ctx, iterator, stream) } func sendPatternSample(ctx context.Context, it iter.Iterator, stream logproto.Pattern_QueryServer) error { for ctx.Err() == nil { - batch, err := iter.ReadBatch(it, readBatchSize) + batch, err := iter.ReadPatternsBatch(it, readBatchSize) + if err != nil { + return err + } + if err := stream.Send(batch); err != nil && err != context.Canceled { + return err + } + if len(batch.Series) == 0 { + return nil + } + } + return nil +} + +func sendMetricsSample( + ctx context.Context, + it iter.Iterator, + stream logproto.Pattern_QueryServer, +) error { + for ctx.Err() == nil { + batch, err := iter.ReadMetricsBatch(it, readBatchSize) if err != nil { return err } diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 13315b0a13f1a..e9b42010a56ea 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -2,6 +2,7 @@ package pattern import ( "context" + "errors" "math" "net/http" @@ -19,6 +20,8 @@ import ( // TODO(kolesnikovae): parametrise QueryPatternsRequest const minClusterSize = 30 +var ErrParseQuery = errors.New("only label matcher, byte_over_time, and count_over_time queries without filters are supported") + type IngesterQuerier struct { cfg Config logger log.Logger @@ -44,10 +47,24 @@ func NewIngesterQuerier( } func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { + // validate that a supported query was provided + // TODO(twhitney): validate metric queries don't have filters + var expr syntax.Expr _, err := syntax.ParseMatchers(req.Query, true) if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + expr, err = syntax.ParseSampleExpr(req.Query) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, ErrParseQuery.Error()) + } + + switch expr.(type) { + case *syntax.VectorAggregationExpr, *syntax.RangeAggregationExpr: + break + default: + return nil, ErrParseQuery + } } + resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) { return client.Query(ctx, req) }) @@ -58,18 +75,30 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte for i := range resps { iterators[i] = iter.NewQueryClientIterator(resps[i].response.(logproto.Pattern_QueryClient)) } - // TODO(kolesnikovae): Incorporate with pruning - resp, err := iter.ReadBatch(iter.NewMerge(iterators...), math.MaxInt32) - if err != nil { - return nil, err + switch expr.(type) { + case *syntax.VectorAggregationExpr, *syntax.RangeAggregationExpr: + resp, err := iter.ReadMetricsBatch(iter.NewMerge(iterators...), math.MaxInt32) + if err != nil { + return nil, err + } + return resp, nil + default: + // TODO(kolesnikovae): Incorporate with pruning + resp, err := iter.ReadPatternsBatch(iter.NewMerge(iterators...), math.MaxInt32) + if err != nil { + return nil, err + } + return prunePatterns(resp, minClusterSize), nil } - return prunePatterns(resp, minClusterSize), nil } -func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse { +func prunePatterns( + resp *logproto.QueryPatternsResponse, + minClusterSize int, +) *logproto.QueryPatternsResponse { d := drain.New(drain.DefaultConfig(), nil) for _, p := range resp.Series { - d.TrainPattern(p.Pattern, p.Samples) + d.TrainPattern(p.GetPattern(), p.Samples) } resp.Series = resp.Series[:0] @@ -81,10 +110,8 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *lo if pattern == "" { continue } - resp.Series = append(resp.Series, &logproto.PatternSeries{ - Pattern: pattern, - Samples: cluster.Samples(), - }) + resp.Series = append(resp.Series, + logproto.NewPatternSeriesWithPattern(pattern, cluster.Samples())) } return resp } diff --git a/pkg/pattern/ingester_querier_test.go b/pkg/pattern/ingester_querier_test.go index d1016b326df73..c83da105ec2c6 100644 --- a/pkg/pattern/ingester_querier_test.go +++ b/pkg/pattern/ingester_querier_test.go @@ -18,9 +18,7 @@ func Test_prunePatterns(t *testing.T) { resp := new(logproto.QueryPatternsResponse) scanner := bufio.NewScanner(file) for scanner.Scan() { - resp.Series = append(resp.Series, &logproto.PatternSeries{ - Pattern: scanner.Text(), - }) + resp.Series = append(resp.Series, logproto.NewPatternSeriesWithPattern(scanner.Text(), []*logproto.PatternSample{})) } require.NoError(t, scanner.Err()) prunePatterns(resp, 0) @@ -36,7 +34,7 @@ func Test_prunePatterns(t *testing.T) { patterns := make([]string, 0, len(resp.Series)) for _, p := range resp.Series { - patterns = append(patterns, p.Pattern) + patterns = append(patterns, p.GetPattern()) } require.Equal(t, expectedPatterns, patterns) diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index eff054b9ec041..bc681e8faf02a 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -11,53 +11,268 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/pkg/push" ) func TestInstancePushQuery(t *testing.T) { - lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test")) - require.NoError(t, err) - - err = inst.Push(context.Background(), &push.PushRequest{ - Streams: []push.Stream{ - { - Labels: lbs.String(), - Entries: []push.Entry{ + t.Run("test pattern samples", func(t *testing.T) { + lbs := labels.New(labels.Label{Name: "test", Value: "test"}) + inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test")) + require.NoError(t, err) + + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(20, 0), + Line: "ts=1 msg=hello", + }, + }, + }, + }, + }) + + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(30, 0), + Line: "ts=2 msg=hello", + }, + }, + }, + }, + }) + for i := 0; i <= 30; i++ { + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ { - Timestamp: time.Unix(20, 0), - Line: "ts=1 msg=hello", + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(30, 0), + Line: "foo bar foo bar", + }, + }, + }, + }, + }) + require.NoError(t, err) + } + require.NoError(t, err) + it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{ + Query: "{test=\"test\"}", + Start: time.Unix(0, 0), + End: time.Unix(0, math.MaxInt64), + }) + require.NoError(t, err) + res, err := iter.ReadAllWithPatterns(it) + require.NoError(t, err) + require.Equal(t, 2, len(res.Series)) + + it, err = inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{ + Query: "{test=\"test\"}", + Start: time.Unix(0, 0), + End: time.Unix(30, 0), + }) + require.NoError(t, err) + res, err = iter.ReadAllWithPatterns(it) + require.NoError(t, err) + // query should be inclusive of end time to match our + // existing metric query behavior + require.Equal(t, 2, len(res.Series)) + require.Equal(t, 2, len(res.Series[0].Samples)) + }) + + t.Run("test count_over_time samples", func(t *testing.T) { + lbs := labels.New(labels.Label{Name: "test", Value: "test"}) + inst, err := newInstance("foo", log.NewNopLogger()) + require.NoError(t, err) + + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(0, 0), + Line: "ts=1 msg=hello", + }, }, }, }, - }, + }) + for i := 1; i <= 30; i++ { + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(int64(20*i), 0), + Line: "foo bar foo bar", + }, + }, + }, + }, + }) + require.NoError(t, err) + } + require.NoError(t, err) + + expr, err := syntax.ParseSampleExpr(`count_over_time({test="test"}[20s])`) + require.NoError(t, err) + + it, err := inst.QuerySample(context.Background(), expr, &logproto.QueryPatternsRequest{ + Query: expr.String(), + Start: time.Unix(0, 0), + End: time.Unix(int64(20*30), 0), + Step: 10000, + }) + require.NoError(t, err) + res, err := iter.ReadAllWithLabels(it) + require.NoError(t, err) + require.Equal(t, 1, len(res.Series)) + + require.Equal(t, lbs.String(), res.Series[0].GetLabels()) + + // end - start / step -- (start is 0, step is 10s) + // plus one because end is actually inclusive for metric queries + expectedDataPoints := ((20 * 30) / 10) + 1 + require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) + require.Equal(t, int64(1), res.Series[0].Samples[0].Value) + + expr, err = syntax.ParseSampleExpr(`count_over_time({test="test"}[80s])`) + require.NoError(t, err) + + it, err = inst.QuerySample(context.Background(), expr, &logproto.QueryPatternsRequest{ + Query: expr.String(), + Start: time.Unix(0, 0), + End: time.Unix(int64(20*30), 0), + Step: 10000, + }) + require.NoError(t, err) + res, err = iter.ReadAllWithLabels(it) + require.NoError(t, err) + require.Equal(t, 1, len(res.Series)) + + require.Equal(t, lbs.String(), res.Series[0].GetLabels()) + + // end - start / step -- (start is 0, step is 10s) + // plus one because end is actually inclusive for metric queries + expectedDataPoints = ((20 * 30) / 10) + 1 + require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) + + // with a larger selection range of 80s, we expect to eventually get up to 4 per datapoint + // our pushes are spaced 20s apart, and there's 10s step, so we ecpect to see the value increase + // every 2 samples, maxing out and staying at 4 after 6 samples (since it starts a 1, not 0) + require.Equal(t, int64(1), res.Series[0].Samples[0].Value) + require.Equal(t, int64(1), res.Series[0].Samples[1].Value) + require.Equal(t, int64(2), res.Series[0].Samples[2].Value) + require.Equal(t, int64(2), res.Series[0].Samples[3].Value) + require.Equal(t, int64(3), res.Series[0].Samples[4].Value) + require.Equal(t, int64(3), res.Series[0].Samples[5].Value) + require.Equal(t, int64(4), res.Series[0].Samples[6].Value) + require.Equal(t, int64(4), res.Series[0].Samples[expectedDataPoints-1].Value) }) - for i := 0; i <= 30; i++ { + + t.Run("test bytes_over_time samples", func(t *testing.T) { + lbs := labels.New(labels.Label{Name: "test", Value: "test"}) + inst, err := newInstance("foo", log.NewNopLogger()) + require.NoError(t, err) + err = inst.Push(context.Background(), &push.PushRequest{ Streams: []push.Stream{ { Labels: lbs.String(), Entries: []push.Entry{ { - Timestamp: time.Unix(20, 0), - Line: "foo bar foo bar", + Timestamp: time.Unix(0, 0), + Line: "foo bar foo bars", }, }, }, }, }) + for i := 1; i <= 30; i++ { + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(int64(20*i), 0), + Line: "foo bar foo bars", + }, + }, + }, + }, + }) + require.NoError(t, err) + } + require.NoError(t, err) + + expr, err := syntax.ParseSampleExpr(`bytes_over_time({test="test"}[20s])`) + require.NoError(t, err) + + it, err := inst.QuerySample(context.Background(), expr, &logproto.QueryPatternsRequest{ + Query: expr.String(), + Start: time.Unix(0, 0), + End: time.Unix(int64(20*30), 0), + Step: 10000, + }) require.NoError(t, err) - } - require.NoError(t, err) - it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{ - Query: "{test=\"test\"}", - Start: time.Unix(0, 0), - End: time.Unix(0, math.MaxInt64), + res, err := iter.ReadAllWithLabels(it) + require.NoError(t, err) + require.Equal(t, 1, len(res.Series)) + + require.Equal(t, lbs.String(), res.Series[0].GetLabels()) + + // end - start / step -- (start is 0, step is 10s) + // plus one because end is actually inclusive for metric queries + expectedDataPoints := ((20 * 30) / 10) + 1 + require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) + require.Equal(t, int64(16), res.Series[0].Samples[0].Value) + + expr, err = syntax.ParseSampleExpr(`bytes_over_time({test="test"}[80s])`) + require.NoError(t, err) + + it, err = inst.QuerySample(context.Background(), expr, &logproto.QueryPatternsRequest{ + Query: expr.String(), + Start: time.Unix(0, 0), + End: time.Unix(int64(20*30), 0), + Step: 10000, + }) + require.NoError(t, err) + res, err = iter.ReadAllWithLabels(it) + require.NoError(t, err) + require.Equal(t, 1, len(res.Series)) + + require.Equal(t, lbs.String(), res.Series[0].GetLabels()) + + // end - start / step -- (start is 0, step is 10s) + // plus one because end is actually inclusive for metric queries + expectedDataPoints = ((20 * 30) / 10) + 1 + require.Equal(t, expectedDataPoints, len(res.Series[0].Samples)) + + // with a larger selection range of 80s, we expect to eventually get up to 64 bytes + // as each pushe is 16 bytes and are spaced 20s apart. We query with 10s step, + // so we ecpect to see the value increase by 16 bytes every 2 samples, + // maxing out and staying at 64 after 6 samples (since it starts a 1, not 0) + require.Equal(t, int64(16), res.Series[0].Samples[0].Value) + require.Equal(t, int64(16), res.Series[0].Samples[1].Value) + require.Equal(t, int64(32), res.Series[0].Samples[2].Value) + require.Equal(t, int64(32), res.Series[0].Samples[3].Value) + require.Equal(t, int64(48), res.Series[0].Samples[4].Value) + require.Equal(t, int64(48), res.Series[0].Samples[5].Value) + require.Equal(t, int64(64), res.Series[0].Samples[6].Value) + require.Equal(t, int64(64), res.Series[0].Samples[expectedDataPoints-1].Value) }) - require.NoError(t, err) - res, err := iter.ReadAll(it) - require.NoError(t, err) - require.Equal(t, 2, len(res.Series)) } diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index f6efa7de04435..cafc8dec23548 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -15,8 +15,9 @@ import ( "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/v3/pkg/pattern/drain" + "github.com/grafana/loki/v3/pkg/pattern/chunk" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/v3/pkg/util" ) @@ -82,8 +83,8 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ } from, through := util.RoundToMilliseconds(req.Start, req.End) step := model.Time(req.Step) - if step < drain.TimeResolution { - step = drain.TimeResolution + if step < chunk.TimeResolution { + step = chunk.TimeResolution } var iters []iter.Iterator @@ -101,6 +102,56 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ return iter.NewMerge(iters...), nil } +func (i *instance) QuerySample( + ctx context.Context, + expr syntax.SampleExpr, + req *logproto.QueryPatternsRequest, +) (iter.Iterator, error) { + from, through := util.RoundToMilliseconds(req.Start, req.End) + step := model.Time(req.Step) + if step < chunk.TimeResolution { + step = chunk.TimeResolution + } + + selector, err := expr.Selector() + if err != nil { + return nil, err + } + + typ, err := metric.ExtractMetricType(expr) + if err != nil || typ == metric.Unsupported { + return nil, err + } + + var iters []iter.Iterator + err = i.forMatchingStreams( + selector.Matchers(), + func(stream *stream) error { + var iter iter.Iterator + var err error + if typ == metric.Bytes { + iter, err = stream.BytesIterator(ctx, expr, from, through, step) + } else if typ == metric.Count { + iter, err = stream.CountIterator(ctx, expr, from, through, step) + } else { + return fmt.Errorf("unsupported query operation") + } + + if err != nil { + return err + } + + iters = append(iters, iter) + return nil + }, + ) + if err != nil { + return nil, err + } + + return iter.NewMerge(iters...), nil +} + // forMatchingStreams will execute a function for each stream that matches the given matchers. func (i *instance) forMatchingStreams( matchers []*labels.Matcher, diff --git a/pkg/pattern/instance_test.go b/pkg/pattern/instance_test.go new file mode 100644 index 0000000000000..7edd5364e6f12 --- /dev/null +++ b/pkg/pattern/instance_test.go @@ -0,0 +1,115 @@ +package pattern + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInstance_QuerySample(t *testing.T) { + ctx := context.Background() + thirtySeconds := int64(30000) + oneMin := int64(60000) + fiveMin := oneMin * 5 + now := int64(1715964275000) + then := now - fiveMin // 1715963975000 + + mockReq := &logproto.QueryPatternsRequest{ + Start: time.Unix(then/1000, 0), + End: time.Now(), + Step: oneMin, + } + + instance, err := newInstance("test", log.NewNopLogger()) + require.NoError(t, err) + + labels := model.LabelSet{ + model.LabelName("foo"): model.LabelValue("bar"), + } + + lastTsMilli := (then + oneMin + oneMin) // 1715964095000 + + //TODO(twhitney): Add a few more pushes to this or another test + instance.Push(ctx, &logproto.PushRequest{ + Streams: []push.Stream{ + { + Labels: labels.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(then/1000, 0), + Line: "this=that color=blue", + }, + { + Timestamp: time.Unix((then+thirtySeconds)/1000, 0), + Line: "this=that color=blue", + }, + { + Timestamp: time.Unix((then+oneMin)/1000, 0), + Line: "this=that color=blue", + }, + { + Timestamp: time.Unix(lastTsMilli/1000, 0), + Line: "this=that color=blue", + }, + }, + Hash: uint64(labels.Fingerprint()), + }, + }, + }) + + t.Run("successful count over time query", func(t *testing.T) { + expr, err := syntax.ParseSampleExpr(`count_over_time({foo="bar"}[30s])`) + require.NoError(t, err) + + iter, err := instance.QuerySample(ctx, expr, mockReq) + assert.NoError(t, err) + assert.NotNil(t, iter) + + // start is request start minus range, which is 30s here + start := then - 30000 + require.True(t, start < lastTsMilli-30000) + secondPoint := start + oneMin + require.True(t, secondPoint < lastTsMilli-30000) + // this is the first point past the lastTsMilli + thirdPoint := secondPoint + oneMin + require.Equal(t, lastTsMilli-30000, thirdPoint) + + next := iter.Next() + require.True(t, next) + + sample := iter.At() + require.Equal(t, int64(4), sample.Value) + require.Equal(t, model.Time(thirdPoint), sample.Timestamp) + + next = iter.Next() + require.False(t, next) + }) + + t.Run("successful bytes over time query", func(t *testing.T) { + expr, err := syntax.ParseSampleExpr(`bytes_over_time({foo="bar"}[30s])`) + require.NoError(t, err) + + iter, err := instance.QuerySample(ctx, expr, mockReq) + assert.NoError(t, err) + assert.NotNil(t, iter) + + next := iter.Next() + require.True(t, next) + + expctedTs := (then - 30000) + oneMin + oneMin + sample := iter.At() + require.Equal(t, int64(80), sample.Value) + require.Equal(t, model.Time(expctedTs), sample.Timestamp) + + next = iter.Next() + require.False(t, next) + }) +} diff --git a/pkg/pattern/iter/batch.go b/pkg/pattern/iter/batch.go index 80ad1197c80a9..159edf1d73d28 100644 --- a/pkg/pattern/iter/batch.go +++ b/pkg/pattern/iter/batch.go @@ -6,7 +6,33 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" ) -func ReadBatch(it Iterator, batchSize int) (*logproto.QueryPatternsResponse, error) { +func ReadMetricsBatch(it Iterator, batchSize int) (*logproto.QueryPatternsResponse, error) { + var ( + series = map[string][]*logproto.PatternSample{} + respSize int + ) + + for ; respSize < batchSize && it.Next(); respSize++ { + labels := it.Labels() + sample := it.At() + series[labels.String()] = append(series[labels.String()], &sample) + } + result := logproto.QueryPatternsResponse{ + Series: make([]*logproto.PatternSeries, 0, len(series)), + } + for id, samples := range series { + result.Series = append( + result.Series, + logproto.NewPatternSeriesWithLabels(id, samples), + ) + } + return &result, it.Error() +} + +func ReadPatternsBatch( + it Iterator, + batchSize int, +) (*logproto.QueryPatternsResponse, error) { var ( series = map[string][]*logproto.PatternSample{} respSize int @@ -20,15 +46,19 @@ func ReadBatch(it Iterator, batchSize int) (*logproto.QueryPatternsResponse, err result := logproto.QueryPatternsResponse{ Series: make([]*logproto.PatternSeries, 0, len(series)), } - for pattern, samples := range series { - result.Series = append(result.Series, &logproto.PatternSeries{ - Pattern: pattern, - Samples: samples, - }) + for id, samples := range series { + result.Series = append( + result.Series, + logproto.NewPatternSeriesWithPattern(id, samples), + ) } return &result, it.Error() } -func ReadAll(it Iterator) (*logproto.QueryPatternsResponse, error) { - return ReadBatch(it, math.MaxInt32) +func ReadAllWithPatterns(it Iterator) (*logproto.QueryPatternsResponse, error) { + return ReadPatternsBatch(it, math.MaxInt32) +} + +func ReadAllWithLabels(it Iterator) (*logproto.QueryPatternsResponse, error) { + return ReadMetricsBatch(it, math.MaxInt32) } diff --git a/pkg/pattern/iter/batch_test.go b/pkg/pattern/iter/batch_test.go index 7f544e23f417d..d798583d1a2ac 100644 --- a/pkg/pattern/iter/batch_test.go +++ b/pkg/pattern/iter/batch_test.go @@ -32,13 +32,13 @@ func TestReadBatch(t *testing.T) { batchSize: 2, expected: &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{ - { - Pattern: "foo", - Samples: []*logproto.PatternSample{ + logproto.NewPatternSeriesWithPattern( + "foo", + []*logproto.PatternSample{ {Timestamp: 10, Value: 2}, {Timestamp: 20, Value: 4}, }, - }, + ), }, }, }, @@ -49,14 +49,14 @@ func TestReadBatch(t *testing.T) { batchSize: 4, expected: &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{ - { - Pattern: "foo", - Samples: []*logproto.PatternSample{ + logproto.NewPatternSeriesWithPattern( + "foo", + []*logproto.PatternSample{ {Timestamp: 10, Value: 2}, {Timestamp: 20, Value: 4}, {Timestamp: 30, Value: 6}, }, - }, + ), }, }, }, @@ -64,8 +64,8 @@ func TestReadBatch(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - it := NewSlice(tt.pattern, tt.samples) - got, err := ReadBatch(it, tt.batchSize) + it := NewPatternSlice(tt.pattern, tt.samples) + got, err := ReadPatternsBatch(it, tt.batchSize) require.NoError(t, err) require.Equal(t, tt.expected, got) }) diff --git a/pkg/pattern/iter/iterator.go b/pkg/pattern/iter/iterator.go index 5a277c0f27349..2f27104f0da07 100644 --- a/pkg/pattern/iter/iterator.go +++ b/pkg/pattern/iter/iterator.go @@ -1,7 +1,9 @@ package iter import ( + "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/prometheus/prometheus/model/labels" ) var Empty Iterator = &emptyIterator{} @@ -10,49 +12,75 @@ type Iterator interface { Next() bool Pattern() string + Labels() labels.Labels At() logproto.PatternSample Error() error Close() error } -func NewSlice(pattern string, s []logproto.PatternSample) Iterator { - return &sliceIterator{ +type SampleIterator interface { + Iterator + Sample() logproto.PatternSample +} + +type PeekingIterator interface { + SampleIterator + Peek() (string, logproto.PatternSample, bool) +} + +func NewPatternSlice(pattern string, s []logproto.PatternSample) Iterator { + return &patternSliceIterator{ values: s, pattern: pattern, + labels: labels.EmptyLabels(), i: -1, } } -type sliceIterator struct { +func NewLabelsSlice(lbls labels.Labels, s []logproto.PatternSample) Iterator { + return &patternSliceIterator{ + values: s, + labels: lbls, + i: -1, + } +} + +type patternSliceIterator struct { i int pattern string + labels labels.Labels values []logproto.PatternSample } -func (s *sliceIterator) Next() bool { +func (s *patternSliceIterator) Next() bool { s.i++ return s.i < len(s.values) } -func (s *sliceIterator) Pattern() string { +func (s *patternSliceIterator) Pattern() string { return s.pattern } -func (s *sliceIterator) At() logproto.PatternSample { +func (s *patternSliceIterator) Labels() labels.Labels { + return s.labels +} + +func (s *patternSliceIterator) At() logproto.PatternSample { return s.values[s.i] } -func (s *sliceIterator) Error() error { +func (s *patternSliceIterator) Error() error { return nil } -func (s *sliceIterator) Close() error { +func (s *patternSliceIterator) Close() error { return nil } type emptyIterator struct { pattern string + labels labels.Labels } func (e *emptyIterator) Next() bool { @@ -63,6 +91,10 @@ func (e *emptyIterator) Pattern() string { return e.pattern } +func (e *emptyIterator) Labels() labels.Labels { + return e.labels +} + func (e *emptyIterator) At() logproto.PatternSample { return logproto.PatternSample{} } @@ -79,16 +111,25 @@ type nonOverlappingIterator struct { iterators []Iterator curr Iterator pattern string + labels labels.Labels } -// NewNonOverlappingIterator gives a chained iterator over a list of iterators. -func NewNonOverlappingIterator(pattern string, iterators []Iterator) Iterator { +// NewNonOverlappingPatternIterator gives a chained iterator over a list of iterators. +func NewNonOverlappingPatternIterator(pattern string, iterators []Iterator) Iterator { return &nonOverlappingIterator{ iterators: iterators, pattern: pattern, } } +// NewNonOverlappingLabelsIterator gives a chained iterator over a list of iterators. +func NewNonOverlappingLabelsIterator(labels labels.Labels, iterators []Iterator) Iterator { + return &nonOverlappingIterator{ + iterators: iterators, + labels: labels, + } +} + func (i *nonOverlappingIterator) Next() bool { for i.curr == nil || !i.curr.Next() { if len(i.iterators) == 0 { @@ -114,6 +155,10 @@ func (i *nonOverlappingIterator) Pattern() string { return i.pattern } +func (i *nonOverlappingIterator) Labels() labels.Labels { + return i.labels +} + func (i *nonOverlappingIterator) Error() error { if i.curr == nil { return nil @@ -131,3 +176,114 @@ func (i *nonOverlappingIterator) Close() error { i.iterators = nil return nil } + +type peekingIterator struct { + iter Iterator + + cache *sampleWithLabels + next *sampleWithLabels + labels labels.Labels +} + +type sampleWithLabels struct { + logproto.PatternSample + labels labels.Labels +} + +func (s *sampleWithLabels) Sample() logproto.Sample { + return logproto.Sample{ + Timestamp: s.PatternSample.Timestamp.UnixNano(), // logproto.Sample expects nano seconds + Value: float64(s.PatternSample.Value), + Hash: 0, + } +} + +func NewPeekingSampleIterator(iter Iterator) iter.PeekingSampleIterator { + // initialize the next entry so we can peek right from the start. + var cache *sampleWithLabels + next := &sampleWithLabels{} + if iter.Next() { + cache = &sampleWithLabels{ + PatternSample: iter.At(), + labels: iter.Labels(), + } + next.PatternSample = cache.PatternSample + next.labels = cache.labels + } + + return &peekingIterator{ + iter: iter, + cache: cache, + next: next, + labels: iter.Labels(), + } +} + +func (it *peekingIterator) Close() error { + return it.iter.Close() +} + +func (it *peekingIterator) Labels() string { + return it.labels.String() +} + +func (it *peekingIterator) Next() bool { + if it.cache != nil { + it.next.PatternSample = it.cache.PatternSample + it.next.labels = it.cache.labels + it.cacheNext() + return true + } + return false +} + +func (it *peekingIterator) Sample() logproto.Sample { + if it.next != nil { + return logproto.Sample{ + Timestamp: it.next.PatternSample.Timestamp.UnixNano(), // expecting nano seconds + Value: float64(it.next.PatternSample.Value), + Hash: 0, + } + } + return logproto.Sample{} +} + +func (it *peekingIterator) At() logproto.PatternSample { + if it.next != nil { + return it.next.PatternSample + } + return logproto.PatternSample{} +} + +// cacheNext caches the next element if it exists. +func (it *peekingIterator) cacheNext() { + if it.iter.Next() { + it.cache.PatternSample = it.iter.At() + it.cache.labels = it.iter.Labels() + return + } + // nothing left, remove the cached entry + it.cache = nil +} + +func (it *peekingIterator) Pattern() logproto.PatternSample { + if it.next != nil { + return it.next.PatternSample + } + return logproto.PatternSample{} +} + +func (it *peekingIterator) Peek() (string, logproto.Sample, bool) { + if it.cache != nil { + return it.cache.labels.String(), it.cache.Sample(), true + } + return "", logproto.Sample{}, false +} + +func (it *peekingIterator) Error() error { + return it.iter.Error() +} + +func (it *peekingIterator) StreamHash() uint64 { + return 0 +} diff --git a/pkg/pattern/iter/iterator_test.go b/pkg/pattern/iter/iterator_test.go index b327800575b55..dd66361d635b2 100644 --- a/pkg/pattern/iter/iterator_test.go +++ b/pkg/pattern/iter/iterator_test.go @@ -3,57 +3,111 @@ package iter import ( "testing" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" ) func TestSliceIterator(t *testing.T) { - tests := []struct { - name string - pattern string - samples []logproto.PatternSample - want []patternSample - }{ - { - name: "1 samples", - pattern: "foo", - samples: []logproto.PatternSample{ - {Timestamp: 10, Value: 2}, + t.Run("samples with pattern", func(t *testing.T) { + tests := []struct { + name string + pattern string + samples []logproto.PatternSample + want []patternSample + }{ + { + name: "1 samples", + pattern: "foo", + samples: []logproto.PatternSample{ + {Timestamp: 10, Value: 2}, + }, + want: []patternSample{ + {"foo", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 10, Value: 2}}, + }, }, - want: []patternSample{ - {"foo", logproto.PatternSample{Timestamp: 10, Value: 2}}, + { + name: "3 samples", + pattern: "foo", + samples: []logproto.PatternSample{ + {Timestamp: 10, Value: 2}, + {Timestamp: 20, Value: 4}, + {Timestamp: 30, Value: 6}, + }, + want: []patternSample{ + {"foo", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 10, Value: 2}}, + {"foo", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"foo", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 30, Value: 6}}, + }, }, - }, - { - name: "3 samples", - pattern: "foo", - samples: []logproto.PatternSample{ - {Timestamp: 10, Value: 2}, - {Timestamp: 20, Value: 4}, - {Timestamp: 30, Value: 6}, + { + name: "empty", + pattern: "foo", + samples: nil, + want: nil, }, - want: []patternSample{ - {"foo", logproto.PatternSample{Timestamp: 10, Value: 2}}, - {"foo", logproto.PatternSample{Timestamp: 20, Value: 4}}, - {"foo", logproto.PatternSample{Timestamp: 30, Value: 6}}, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + got := slice(NewPatternSlice(tt.pattern, tt.samples)) + require.Equal(t, tt.want, got) + }) + } + }) + + t.Run("samples with labels", func(t *testing.T) { + stream := labels.Labels{ + {Name: "test", Value: "test"}, + } + tests := []struct { + name string + labels labels.Labels + samples []logproto.PatternSample + want []patternSample + }{ + { + name: "1 samples", + labels: stream, + samples: []logproto.PatternSample{ + {Timestamp: 10, Value: 2}, + }, + want: []patternSample{ + {"", stream, logproto.PatternSample{Timestamp: 10, Value: 2}}, + }, }, - }, - { - name: "empty", - pattern: "foo", - samples: nil, - want: nil, - }, - } + { + name: "3 samples", + labels: stream, + samples: []logproto.PatternSample{ + {Timestamp: 10, Value: 2}, + {Timestamp: 20, Value: 4}, + {Timestamp: 30, Value: 6}, + }, + want: []patternSample{ + {"", stream, logproto.PatternSample{Timestamp: 10, Value: 2}}, + {"", stream, logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"", stream, logproto.PatternSample{Timestamp: 30, Value: 6}}, + }, + }, + { + name: "empty", + labels: labels.EmptyLabels(), + samples: nil, + want: nil, + }, + } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - got := slice(NewSlice(tt.pattern, tt.samples)) - require.Equal(t, tt.want, got) - }) - } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + got := slice(NewLabelsSlice(tt.labels, tt.samples)) + require.Equal(t, tt.want, got) + }) + } + }) } func slice(it Iterator) []patternSample { @@ -63,6 +117,7 @@ func slice(it Iterator) []patternSample { samples = append(samples, patternSample{ pattern: it.Pattern(), sample: it.At(), + labels: it.Labels(), }) } if it.Error() != nil { diff --git a/pkg/pattern/iter/merge.go b/pkg/pattern/iter/merge.go index 3b0e07e33b8a8..0c7c19a633a93 100644 --- a/pkg/pattern/iter/merge.go +++ b/pkg/pattern/iter/merge.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/util/loser" + "github.com/prometheus/prometheus/model/labels" ) type mergeIterator struct { @@ -16,18 +17,22 @@ type mergeIterator struct { type patternSample struct { pattern string + labels labels.Labels sample logproto.PatternSample } var max = patternSample{ pattern: "", + labels: labels.Labels{}, sample: logproto.PatternSample{Timestamp: math.MaxInt64}, } func NewMerge(iters ...Iterator) Iterator { + // TODO: I need to call next here tree := loser.New(iters, max, func(s Iterator) patternSample { return patternSample{ pattern: s.Pattern(), + labels: s.Labels(), sample: s.At(), } }, func(e1, e2 patternSample) bool { @@ -57,10 +62,13 @@ func (m *mergeIterator) Next() bool { } m.current.pattern = m.tree.Winner().Pattern() + m.current.labels = m.tree.Winner().Labels() m.current.sample = m.tree.Winner().At() for m.tree.Next() { - if m.current.sample.Timestamp != m.tree.Winner().At().Timestamp || m.current.pattern != m.tree.Winner().Pattern() { + if m.current.sample.Timestamp != m.tree.Winner().At().Timestamp || + m.current.pattern != m.tree.Winner().Pattern() || + m.current.labels.String() != m.tree.Winner().Labels().String() { return true } m.current.sample.Value += m.tree.Winner().At().Value @@ -74,6 +82,10 @@ func (m *mergeIterator) Pattern() string { return m.current.pattern } +func (m *mergeIterator) Labels() labels.Labels { + return m.current.labels +} + func (m *mergeIterator) At() logproto.PatternSample { return m.current.sample } diff --git a/pkg/pattern/iter/merge_test.go b/pkg/pattern/iter/merge_test.go index a1d643a5a01c1..d55f417062bdc 100644 --- a/pkg/pattern/iter/merge_test.go +++ b/pkg/pattern/iter/merge_test.go @@ -3,76 +3,150 @@ package iter import ( "testing" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" ) func TestMerge(t *testing.T) { - tests := []struct { - name string - iterators []Iterator - expected []patternSample - }{ - { - name: "Empty iterators", - iterators: []Iterator{}, - expected: nil, - }, - { - name: "Merge single iterator", - iterators: []Iterator{ - NewSlice("a", []logproto.PatternSample{ - {Timestamp: 10, Value: 2}, {Timestamp: 20, Value: 4}, {Timestamp: 30, Value: 6}, - }), + t.Run("merging patterns", func(t *testing.T) { + tests := []struct { + name string + iterators []Iterator + expected []patternSample + }{ + { + name: "Empty iterators", + iterators: []Iterator{}, + expected: nil, }, - expected: []patternSample{ - {"a", logproto.PatternSample{Timestamp: 10, Value: 2}}, - {"a", logproto.PatternSample{Timestamp: 20, Value: 4}}, - {"a", logproto.PatternSample{Timestamp: 30, Value: 6}}, + { + name: "Merge single iterator", + iterators: []Iterator{ + NewPatternSlice("a", []logproto.PatternSample{ + {Timestamp: 10, Value: 2}, {Timestamp: 20, Value: 4}, {Timestamp: 30, Value: 6}, + }), + }, + expected: []patternSample{ + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 10, Value: 2}}, + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 30, Value: 6}}, + }, }, - }, - { - name: "Merge multiple iterators", - iterators: []Iterator{ - NewSlice("a", []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), - NewSlice("b", []logproto.PatternSample{{Timestamp: 20, Value: 4}, {Timestamp: 40, Value: 8}}), + { + name: "Merge multiple iterators", + iterators: []Iterator{ + NewPatternSlice("a", []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), + NewPatternSlice("b", []logproto.PatternSample{{Timestamp: 20, Value: 4}, {Timestamp: 40, Value: 8}}), + }, + expected: []patternSample{ + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 10, Value: 2}}, + {"b", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 30, Value: 6}}, + {"b", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 40, Value: 8}}, + }, }, - expected: []patternSample{ - {"a", logproto.PatternSample{Timestamp: 10, Value: 2}}, - {"b", logproto.PatternSample{Timestamp: 20, Value: 4}}, - {"a", logproto.PatternSample{Timestamp: 30, Value: 6}}, - {"b", logproto.PatternSample{Timestamp: 40, Value: 8}}, + { + name: "Merge multiple iterators with similar samples", + iterators: []Iterator{ + NewPatternSlice("a", []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), + NewPatternSlice("a", []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), + NewPatternSlice("b", []logproto.PatternSample{{Timestamp: 20, Value: 4}, {Timestamp: 40, Value: 8}}), + }, + expected: []patternSample{ + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 10, Value: 4}}, + {"b", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"a", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 30, Value: 12}}, + {"b", labels.EmptyLabels(), logproto.PatternSample{Timestamp: 40, Value: 8}}, + }, }, - }, - { - name: "Merge multiple iterators with similar samples", - iterators: []Iterator{ - NewSlice("a", []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), - NewSlice("a", []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), - NewSlice("b", []logproto.PatternSample{{Timestamp: 20, Value: 4}, {Timestamp: 40, Value: 8}}), + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + it := NewMerge(tt.iterators...) + defer it.Close() + + var result []patternSample + for it.Next() { + result = append(result, patternSample{it.Pattern(), it.Labels(), it.At()}) + } + + require.Equal(t, tt.expected, result) + }) + } + }) + + t.Run("merging label samples", func(t *testing.T) { + stream1 := labels.Labels{labels.Label{Name: "foo", Value: "bar"}, labels.Label{Name: "ying", Value: "yang"}} + stream2 := labels.Labels{labels.Label{Name: "foo", Value: "baz"}, labels.Label{Name: "ying", Value: "yang"}} + tests := []struct { + name string + iterators []Iterator + expected []patternSample + }{ + { + name: "Empty iterators", + iterators: []Iterator{}, + expected: nil, + }, + { + name: "Merge single iterator", + iterators: []Iterator{ + NewLabelsSlice(stream1, []logproto.PatternSample{ + {Timestamp: 10, Value: 2}, {Timestamp: 20, Value: 4}, {Timestamp: 30, Value: 6}, + }), + }, + expected: []patternSample{ + {"", stream1, logproto.PatternSample{Timestamp: 10, Value: 2}}, + {"", stream1, logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"", stream1, logproto.PatternSample{Timestamp: 30, Value: 6}}, + }, + }, + { + name: "Merge multiple iterators", + iterators: []Iterator{ + NewLabelsSlice(stream1, []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), + NewLabelsSlice(stream2, []logproto.PatternSample{{Timestamp: 20, Value: 4}, {Timestamp: 40, Value: 8}}), + }, + expected: []patternSample{ + {"", stream1, logproto.PatternSample{Timestamp: 10, Value: 2}}, + {"", stream2, logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"", stream1, logproto.PatternSample{Timestamp: 30, Value: 6}}, + {"", stream2, logproto.PatternSample{Timestamp: 40, Value: 8}}, + }, }, - expected: []patternSample{ - {"a", logproto.PatternSample{Timestamp: 10, Value: 4}}, - {"b", logproto.PatternSample{Timestamp: 20, Value: 4}}, - {"a", logproto.PatternSample{Timestamp: 30, Value: 12}}, - {"b", logproto.PatternSample{Timestamp: 40, Value: 8}}, + { + name: "Merge multiple iterators with similar samples", + iterators: []Iterator{ + NewLabelsSlice(stream1, []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), + NewLabelsSlice(stream1, []logproto.PatternSample{{Timestamp: 10, Value: 2}, {Timestamp: 30, Value: 6}}), + NewLabelsSlice(stream2, []logproto.PatternSample{{Timestamp: 20, Value: 4}, {Timestamp: 40, Value: 8}}), + }, + expected: []patternSample{ + {"", stream1, logproto.PatternSample{Timestamp: 10, Value: 4}}, + {"", stream2, logproto.PatternSample{Timestamp: 20, Value: 4}}, + {"", stream1, logproto.PatternSample{Timestamp: 30, Value: 12}}, + {"", stream2, logproto.PatternSample{Timestamp: 40, Value: 8}}, + }, }, - }, - } + } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - it := NewMerge(tt.iterators...) - defer it.Close() + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + it := NewMerge(tt.iterators...) + defer it.Close() - var result []patternSample - for it.Next() { - result = append(result, patternSample{it.Pattern(), it.At()}) - } + var result []patternSample + for it.Next() { + result = append(result, patternSample{it.Pattern(), it.Labels(), it.At()}) + } - require.Equal(t, tt.expected, result) - }) - } + require.Equal(t, tt.expected, result) + }) + } + }) } diff --git a/pkg/pattern/iter/query_client.go b/pkg/pattern/iter/query_client.go index f6c5c4fa97744..00ed407a39e9b 100644 --- a/pkg/pattern/iter/query_client.go +++ b/pkg/pattern/iter/query_client.go @@ -4,6 +4,8 @@ import ( "io" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" ) type queryClientIterator struct { @@ -38,6 +40,10 @@ func (i *queryClientIterator) Pattern() string { return i.curr.Pattern() } +func (i *queryClientIterator) Labels() labels.Labels { + return i.curr.Labels() +} + func (i *queryClientIterator) At() logproto.PatternSample { return i.curr.At() } @@ -58,7 +64,17 @@ func NewQueryResponseIterator(resp *logproto.QueryPatternsResponse) Iterator { for j, sample := range s.Samples { samples[j] = *sample } - iters[i] = NewSlice(s.Pattern, samples) + + switch s.GetIdentifier().(type) { + case *logproto.PatternSeries_Labels: + ls, err := parser.ParseMetric(s.GetLabels()) + if err != nil { + ls = labels.Labels{} + } + iters[i] = NewLabelsSlice(ls, samples) + default: + iters[i] = NewPatternSlice(s.GetPattern(), samples) + } } return NewMerge(iters...) } diff --git a/pkg/pattern/metric/chunk.go b/pkg/pattern/metric/chunk.go new file mode 100644 index 0000000000000..c4716ba532dd1 --- /dev/null +++ b/pkg/pattern/metric/chunk.go @@ -0,0 +1,201 @@ +package metric + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/chunk" + "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" +) + +type MetricType int + +const ( + Bytes MetricType = iota + Count + Unsupported +) + +type Chunks struct { + chunks []Chunk + labels labels.Labels +} + +func NewChunks(labels labels.Labels) *Chunks { + return &Chunks{ + chunks: make([]Chunk, 0), + labels: labels, + } +} + +func (c *Chunks) Observe(bytes, count uint64, ts model.Time) { + if len(c.chunks) == 0 { + c.chunks = append(c.chunks, newChunk(bytes, count, ts)) + return + } + + last := &(c.chunks)[len(c.chunks)-1] + if !last.spaceFor(ts) { + c.chunks = append(c.chunks, newChunk(bytes, count, ts)) + return + } + + last.AddSample(newSample(bytes, count, ts)) +} + +func (c *Chunks) Iterator( + ctx context.Context, + typ MetricType, + from, through, step model.Time, +) (iter.Iterator, error) { + if typ == Unsupported { + return nil, fmt.Errorf("unsupported metric type") + } + + iters := make([]iter.Iterator, 0, len(c.chunks)) + for _, chunk := range c.chunks { + samples, err := chunk.ForRangeAndType(typ, from, through, step) + if err != nil { + return nil, err + } + + if len(samples) == 0 { + continue + } + + iters = append(iters, iter.NewLabelsSlice(c.labels, samples)) + } + return iter.NewNonOverlappingLabelsIterator(c.labels, iters), nil +} + +// TODO(twhitney): These values should be float64s (to match prometheus samples) or int64s (to match pattern samples) +type MetricSample struct { + Timestamp model.Time + Bytes uint64 + Count uint64 +} + +func newSample(bytes, count uint64, ts model.Time) MetricSample { + return MetricSample{ + Timestamp: ts, + Bytes: bytes, + Count: count, + } +} + +type MetricSamples []MetricSample + +type Chunk struct { + Samples MetricSamples + mint, maxt int64 +} + +func (c *Chunk) Bounds() (fromT, toT time.Time) { + return time.Unix(0, c.mint), time.Unix(0, c.maxt) +} + +func (c *Chunk) AddSample(s MetricSample) { + c.Samples = append(c.Samples, s) + ts := int64(s.Timestamp) + + if ts < c.mint { + c.mint = ts + } + + if ts > c.maxt { + c.maxt = ts + } +} + +func newChunk(bytes, count uint64, ts model.Time) Chunk { + maxSize := int(chunk.MaxChunkTime.Nanoseconds()/chunk.TimeResolution.UnixNano()) + 1 + v := Chunk{Samples: make(MetricSamples, 1, maxSize)} + v.Samples[0] = newSample(bytes, count, ts) + return v +} + +func (c *Chunk) spaceFor(ts model.Time) bool { + if len(c.Samples) == 0 { + return true + } + + return ts.Sub(c.Samples[0].Timestamp) < chunk.MaxChunkTime +} + +//TODO(twhitney): any way to remove the duplication between this and the drain chunk ForRange method? +// ForRangeAndType returns samples with only the values +// in the given range [start:end] and aggregates them by step duration. +// start and end are in milliseconds since epoch. step is a duration in milliseconds. +func (c *Chunk) ForRangeAndType( + typ MetricType, + start, end, step model.Time, +) ([]logproto.PatternSample, error) { + if typ == Unsupported { + return nil, fmt.Errorf("unsupported metric type") + } + + if len(c.Samples) == 0 { + return nil, nil + } + + first := c.Samples[0].Timestamp // why is this in the future? + last := c.Samples[len(c.Samples)-1].Timestamp + startBeforeEnd := start >= end + samplesAreAfterRange := first > end + samplesAreBeforeRange := last < start + if startBeforeEnd || samplesAreAfterRange || samplesAreBeforeRange { + return nil, nil + } + + var lo int + if start > first { + lo = sort.Search(len(c.Samples), func(i int) bool { + return c.Samples[i].Timestamp >= start + }) + } + hi := len(c.Samples) + if end < last { + hi = sort.Search(len(c.Samples), func(i int) bool { + return c.Samples[i].Timestamp > end + }) + } + + // Re-scale samples into step-sized buckets + currentStep := chunk.TruncateTimestamp(c.Samples[lo].Timestamp, step) + numOfSteps := ((c.Samples[hi-1].Timestamp - currentStep) / step) + 1 + aggregatedSamples := make([]logproto.PatternSample, 0, numOfSteps) + aggregatedSamples = append(aggregatedSamples, + logproto.PatternSample{ + Timestamp: currentStep, + Value: 0, + }) + + for _, sample := range c.Samples[lo:hi] { + if sample.Timestamp >= currentStep+step { + stepForSample := chunk.TruncateTimestamp(sample.Timestamp, step) + for i := currentStep + step; i <= stepForSample; i += step { + aggregatedSamples = append(aggregatedSamples, logproto.PatternSample{ + Timestamp: i, + Value: 0, + }) + } + currentStep = stepForSample + } + + var v int64 + if typ == Bytes { + v = int64(sample.Bytes) + } else { + v = int64(sample.Count) + } + + aggregatedSamples[len(aggregatedSamples)-1].Value += v + } + + return aggregatedSamples, nil +} diff --git a/pkg/pattern/metric/chunk_test.go b/pkg/pattern/metric/chunk_test.go new file mode 100644 index 0000000000000..32b746b0d20d0 --- /dev/null +++ b/pkg/pattern/metric/chunk_test.go @@ -0,0 +1,329 @@ +package metric + +import ( + "reflect" + "testing" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestForRangeAndType(t *testing.T) { + testCases := []struct { + name string + c *Chunk + metricType MetricType + start model.Time + end model.Time + expected []logproto.PatternSample + }{ + { + name: "Empty count", + c: &Chunk{}, + metricType: Count, + start: 1, + end: 10, + expected: nil, + }, + { + name: "Empty bytes", + c: &Chunk{}, + metricType: Bytes, + start: 1, + end: 10, + expected: nil, + }, + { + name: "No Overlap -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 10, + end: 20, + expected: nil, + }, + { + name: "No Overlap -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 10, + end: 20, + expected: nil, + }, + { + name: "Complete Overlap -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 0, + end: 10, + expected: []logproto.PatternSample{ + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, + }, + }, + { + name: "Complete Overlap -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 0, + end: 10, + expected: []logproto.PatternSample{ + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + {Timestamp: 6, Value: 6}, + }, + }, + { + name: "Partial Overlap -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 3, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, + }, + { + name: "Partial Overlap -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 3, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, + }, + { + name: "Single Element in Range -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 4, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, + }, + { + name: "Single Element in Range -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 4, + end: 5, + expected: []logproto.PatternSample{{Timestamp: 4, Value: 4}}, + }, + { + name: "Start Before First Element -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 0, + end: 5, + expected: []logproto.PatternSample{ + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + }, + }, + { + name: "Start Before First Element -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 0, + end: 5, + expected: []logproto.PatternSample{ + {Timestamp: 2, Value: 2}, + {Timestamp: 4, Value: 4}, + }, + }, + { + name: "End After Last Element -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 5, + end: 10, + expected: []logproto.PatternSample{ + {Timestamp: 6, Value: 6}, + }, + }, + { + name: "End After Last Element -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 5, + end: 10, + expected: []logproto.PatternSample{ + {Timestamp: 6, Value: 6}, + }, + }, + { + name: "Start before First and End Inclusive of First Element -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 0, + end: 2, + expected: []logproto.PatternSample{{Timestamp: 2, Value: 2}}, + }, + { + name: "Start before First and End Inclusive of First Element -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 0, + end: 2, + expected: []logproto.PatternSample{{Timestamp: 2, Value: 2}}, + }, + { + name: "Start and End before First Element -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Count: 2}, + {Timestamp: 4, Count: 4}, + {Timestamp: 6, Count: 6}, + }}, + metricType: Count, + start: 0, + end: 1, + expected: nil, + }, + { + name: "Start and End before First Element -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 2, Bytes: 2}, + {Timestamp: 4, Bytes: 4}, + {Timestamp: 6, Bytes: 6}, + }}, + metricType: Bytes, + start: 0, + end: 1, + expected: nil, + }, + { + name: "Higher resolution samples down-sampled to preceding step bucket -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 1, Count: 2}, + {Timestamp: 2, Count: 4}, + {Timestamp: 3, Count: 6}, + {Timestamp: 4, Count: 8}, + {Timestamp: 5, Count: 10}, + {Timestamp: 6, Count: 12}, + }}, + metricType: Count, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 10}, + {Timestamp: 4, Value: 18}, + {Timestamp: 6, Value: 12}, + }, + }, + { + name: "Higher resolution samples down-sampled to preceding step bucket -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 1, Bytes: 2}, + {Timestamp: 2, Bytes: 4}, + {Timestamp: 3, Bytes: 6}, + {Timestamp: 4, Bytes: 8}, + {Timestamp: 5, Bytes: 10}, + {Timestamp: 6, Bytes: 12}, + }}, + metricType: Bytes, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 10}, + {Timestamp: 4, Value: 18}, + {Timestamp: 6, Value: 12}, + }, + }, + { + name: "Low resolution samples insert 0 values for empty steps -- count", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 1, Count: 2}, + {Timestamp: 5, Count: 10}, + }}, + metricType: Count, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 0}, + {Timestamp: 4, Value: 10}, + }, + }, + { + name: "Low resolution samples insert 0 values for empty steps -- bytes", + c: &Chunk{Samples: MetricSamples{ + {Timestamp: 1, Bytes: 2}, + {Timestamp: 5, Bytes: 10}, + }}, + metricType: Bytes, + start: 1, + end: 6, + expected: []logproto.PatternSample{ + {Timestamp: 0, Value: 2}, + {Timestamp: 2, Value: 0}, + {Timestamp: 4, Value: 10}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result, err := tc.c.ForRangeAndType(tc.metricType, tc.start, tc.end, model.Time(2)) + require.NoError(t, err) + if !reflect.DeepEqual(result, tc.expected) { + t.Errorf("Expected %v, got %v", tc.expected, result) + } + require.Equal(t, len(result), cap(result), "Returned slice wasn't created at the correct capacity") + }) + } +} diff --git a/pkg/pattern/metric/evaluator.go b/pkg/pattern/metric/evaluator.go new file mode 100644 index 0000000000000..a1e301f31276b --- /dev/null +++ b/pkg/pattern/metric/evaluator.go @@ -0,0 +1,354 @@ +package metric + +import ( + "context" + "fmt" + "sort" + "time" + + loki_iter "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" +) + +// TODO(twhitney): duplication with code in NewStepEvaluator +func ExtractMetricType(expr syntax.SampleExpr) (MetricType, error) { + var typ MetricType + switch e := expr.(type) { + case *syntax.VectorAggregationExpr: + if rangeExpr, ok := e.Left.(*syntax.RangeAggregationExpr); ok && e.Operation == syntax.OpTypeSum { + if rangeExpr.Operation == syntax.OpRangeTypeCount { + typ = Count + } else if rangeExpr.Operation == syntax.OpRangeTypeBytes { + typ = Bytes + } else { + return Unsupported, fmt.Errorf("unsupported aggregation operation %s", e.Operation) + } + } else { + return Unsupported, fmt.Errorf("unsupported aggregation operation %s", e.Operation) + } + case *syntax.RangeAggregationExpr: + if e.Operation == syntax.OpRangeTypeCount { + typ = Count + } else if e.Operation == syntax.OpRangeTypeBytes { + typ = Bytes + } else { + return Unsupported, fmt.Errorf("unsupported aggregation operation %s", e.Operation) + } + default: + return Unsupported, fmt.Errorf("unexpected expression type %T", e) + } + return typ, nil +} + +type SampleEvaluatorFactory interface { + // NewStepEvaluator returns a NewStepEvaluator for a given SampleExpr. + // It's explicitly passed another NewStepEvaluator + // in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible + // NewStepEvaluator implementations which can be composed. + NewStepEvaluator( + ctx context.Context, + nextEvaluatorFactory SampleEvaluatorFactory, + expr syntax.SampleExpr, + typ MetricType, + from, through, step model.Time, + ) (logql.StepEvaluator, error) +} + +type SampleEvaluatorFunc func( + ctx context.Context, + nextEvaluatorFactory SampleEvaluatorFactory, + expr syntax.SampleExpr, + typ MetricType, + from, through, step model.Time, +) (logql.StepEvaluator, error) + +func (s SampleEvaluatorFunc) NewStepEvaluator( + ctx context.Context, + nextEvaluatorFactory SampleEvaluatorFactory, + expr syntax.SampleExpr, + typ MetricType, + from, through, step model.Time, +) (logql.StepEvaluator, error) { + return s(ctx, nextEvaluatorFactory, expr, typ, from, through, step) +} + +type DefaultEvaluatorFactory struct { + chunks *Chunks +} + +func NewDefaultEvaluatorFactory(chunks *Chunks) *DefaultEvaluatorFactory { + return &DefaultEvaluatorFactory{ + chunks: chunks, + } +} + +func (ev *DefaultEvaluatorFactory) NewStepEvaluator( + ctx context.Context, + evFactory SampleEvaluatorFactory, + expr syntax.SampleExpr, + typ MetricType, + from, through, step model.Time, +) (logql.StepEvaluator, error) { + switch e := expr.(type) { + case *syntax.VectorAggregationExpr: + if rangExpr, ok := e.Left.(*syntax.RangeAggregationExpr); ok && e.Operation == syntax.OpTypeSum { + // if range expression is wrapped with a vector expression + // we should send the vector expression for allowing reducing labels at the source. + evFactory = SampleEvaluatorFunc( + func(ctx context.Context, + _ SampleEvaluatorFactory, + _ syntax.SampleExpr, + typ MetricType, + from, through, step model.Time, + ) (logql.StepEvaluator, error) { + fromWithRangeAndOffset := from.Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset) + throughWithOffset := through.Add(-rangExpr.Left.Offset) + it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step) + if err != nil { + return nil, err + } + + params := NewParams( + e, + from.Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset), + through.Add(-rangExpr.Left.Offset), + step, + ) + return NewPatternSampleRangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, params, rangExpr.Left.Offset) + }) + } + + if e.Grouping == nil { + return nil, errors.Errorf("aggregation operator '%q' without grouping", e.Operation) + } + nextEvaluator, err := evFactory.NewStepEvaluator(ctx, evFactory, e.Left, typ, from, through, step) + if err != nil { + return nil, err + } + sort.Strings(e.Grouping.Groups) + + return logql.NewVectorAggEvaluator( + nextEvaluator, + e, + make([]byte, 0, 1024), + labels.NewBuilder(labels.Labels{}), + ), nil + + case *syntax.RangeAggregationExpr: + fromWithRangeAndOffset := from.Add(-e.Left.Interval).Add(-e.Left.Offset) + throughWithOffset := through.Add(-e.Left.Offset) + it, err := ev.chunks.Iterator(ctx, typ, fromWithRangeAndOffset, throughWithOffset, step) + if err != nil { + return nil, err + } + + params := NewParams( + e, + from.Add(-e.Left.Interval).Add(-e.Left.Offset), + through.Add(-e.Left.Offset), + step, // expecting nanoseconds + ) + return NewPatternSampleRangeAggEvaluator(iter.NewPeekingSampleIterator(it), e, params, e.Left.Offset) + default: + return nil, errors.Errorf("unexpected expr type (%T)", e) + } +} + +// Need to create our own StepEvaluator since we only support bytes and count over time, +// and always sum to get those values. In order to accomplish this we need control over the +// aggregation operation.. +func NewPatternSampleRangeAggEvaluator( + it loki_iter.PeekingSampleIterator, + expr *syntax.RangeAggregationExpr, + q logql.Params, + o time.Duration, +) (logql.StepEvaluator, error) { + iter, err := newRangeVectorIterator( + it, expr, + expr.Left.Interval.Nanoseconds(), + q.Step().Nanoseconds(), + q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(), + ) + if err != nil { + return nil, err + } + + return logql.NewRangeVectorEvaluator(iter), nil +} + +func newRangeVectorIterator( + it loki_iter.PeekingSampleIterator, + expr *syntax.RangeAggregationExpr, + selRange, step, start, end, offset int64, +) (logql.RangeVectorIterator, error) { + // forces at least one step. + if step == 0 { + step = 1 + } + if offset != 0 { + start = start - offset + end = end - offset + } + // TODO(twhitney): do I need a streaming aggregator? + // if so the aggregator is going to make this + // a bit of a bad time, as there's currently no + // way to provide a custom one. + // + // var overlap bool + // if selRange >= step && start != end { + // overlap = true + // } + // if !overlap { + // _, err := streamingAggregator(expr) + // if err != nil { + // return nil, err + // } + // return &streamRangeVectorIterator{ + // iter: it, + // step: step, + // end: end, + // selRange: selRange, + // metrics: map[string]labels.Labels{}, + // r: expr, + // current: start - step, // first loop iteration will set it to start + // offset: offset, + // }, nil + // } + + // always sum + aggregator := logql.BatchRangeVectorAggregator(func(samples []promql.FPoint) float64 { + sum := 0.0 + for _, v := range samples { + sum += v.F + } + return sum + }) + + return logql.NewBatchRangeVectorIterator( + it, + selRange, + step, + start, + end, + offset, + aggregator, + ), nil +} + +type SeriesToSampleIterator struct { + floats []promql.FPoint + curTs int64 + cur float64 + lbls labels.Labels +} + +func NewSeriesToSampleIterator(series *promql.Series) *SeriesToSampleIterator { + return &SeriesToSampleIterator{ + floats: series.Floats, + lbls: series.Metric, + } +} + +func (s *SeriesToSampleIterator) Next() bool { + if len(s.floats) == 0 { + return false + } + + current, rest := s.floats[0], s.floats[1:] + + //Is timestamp the correct unit here? + s.curTs = current.T + s.cur = current.F + + s.floats = rest + return true +} + +func (s *SeriesToSampleIterator) Pattern() string { + return "" +} + +func (s *SeriesToSampleIterator) Labels() labels.Labels { + return s.lbls +} + +func (s *SeriesToSampleIterator) At() logproto.PatternSample { + return logproto.PatternSample{ + Timestamp: model.Time(s.curTs), + Value: int64(s.cur), + } +} + +func (s *SeriesToSampleIterator) Error() error { + return nil +} + +func (s *SeriesToSampleIterator) Close() error { + return nil +} + +type paramCompat struct { + expr syntax.SampleExpr + from model.Time + through model.Time + step model.Time +} + +func NewParams( + expr syntax.SampleExpr, + from, through, step model.Time, +) *paramCompat { + return ¶mCompat{ + expr: expr, + from: from, + through: through, + step: step, + } +} + +func (p *paramCompat) QueryString() string { + return p.expr.String() +} + +func (p *paramCompat) Start() time.Time { + return p.from.Time() +} + +func (p *paramCompat) End() time.Time { + return p.through.Time() +} + +func (p *paramCompat) Step() time.Duration { + return time.Duration(p.step.UnixNano()) +} + +func (p *paramCompat) Interval() time.Duration { + return time.Duration(0) +} + +func (p *paramCompat) Limit() uint32 { + return 0 +} + +func (p *paramCompat) Direction() logproto.Direction { + return logproto.BACKWARD +} + +func (p *paramCompat) Shards() []string { + return []string{} +} + +func (p *paramCompat) GetExpression() syntax.Expr { + return p.expr +} + +func (p *paramCompat) GetStoreChunks() *logproto.ChunkRefGroup { + return nil +} diff --git a/pkg/pattern/metric/evaluator_test.go b/pkg/pattern/metric/evaluator_test.go new file mode 100644 index 0000000000000..9f23cb5546e6f --- /dev/null +++ b/pkg/pattern/metric/evaluator_test.go @@ -0,0 +1,363 @@ +package metric + +import ( + "context" + "testing" + + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func Test_SampleEvaluator(t *testing.T) { + fiveMin := int64(300000) + stream := labels.Labels{ + labels.Label{ + Name: "foo", + Value: "bar", + }, + labels.Label{ + Name: "level", + Value: "debug", + }, + } + + setup := func(chunks Chunks, now int64, query string) logql.StepEvaluator { + factory := NewDefaultEvaluatorFactory(&chunks) + + expr, err := syntax.ParseSampleExpr(query) + require.NoError(t, err) + + typ, err := ExtractMetricType(expr) + require.NoError(t, err) + + evaluator, err := factory.NewStepEvaluator( + context.Background(), + factory, + expr.(syntax.SampleExpr), + typ, + model.Time(now-fiveMin), model.Time(now), model.Time(fiveMin), + ) + + require.NoError(t, err) + return evaluator + } + + chunks := func(now, then, beforeThen int64) Chunks { + nowTime := model.Time(now) + thenTime := model.Time(then) + beforeThenTime := model.Time(beforeThen) + return Chunks{ + chunks: []Chunk{ + { + Samples: []MetricSample{ + { + Timestamp: beforeThenTime, + Bytes: 1, + Count: 1, + }, + { + Timestamp: thenTime, + Bytes: 3, + Count: 2, + }, + { + Timestamp: nowTime, + Bytes: 5, + Count: 3, + }, + }, + mint: thenTime.Unix(), + maxt: nowTime.Unix(), + }, + }, + labels: stream, + } + } + + t.Run("grouping", func(t *testing.T) { + group := labels.Labels{ + labels.Label{ + Name: "level", + Value: "debug", + }, + } + t.Run("evenly aligned, non-overlapping timestamps", func(t *testing.T) { + now := int64(1715964275000) + then := now - fiveMin // 1715963975000 -- 5m before n + beforeThen := then - fiveMin // 1715963675000 -- 5m before then + chks := chunks(now, then, beforeThen) + + t.Run("count", func(t *testing.T) { + evaluator := setup(chks, now, `sum by (level)(count_over_time({foo="bar"}[5m]))`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, then-fiveMin <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + resultTs[i] = r.SampleVector()[0].T + resultVals[i] = r.SampleVector()[0].F + + require.Equal(t, group, r.SampleVector()[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, beforeThen, resultTs[0]) + require.Equal(t, then, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(2), resultVals[1]) + require.Equal(t, float64(3), resultVals[2]) + }) + + t.Run("bytes", func(t *testing.T) { + evaluator := setup(chks, now, `sum by (level)(bytes_over_time({foo="bar"}[5m]))`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, then-fiveMin <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + resultTs[i] = r.SampleVector()[0].T + resultVals[i] = r.SampleVector()[0].F + + require.Equal(t, group, r.SampleVector()[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, beforeThen, resultTs[0]) + require.Equal(t, then, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(3), resultVals[1]) // TODO: got 2, expected 3 + require.Equal(t, float64(5), resultVals[2]) + }) + }) + + t.Run("evenly aligned, overlapping timestamps", func(t *testing.T) { + now := int64(1715964275000) + then := now - 150000 // 1715964125000 -- 2.5m before now + beforeThen := then - 450000 // 1715963675000 -- 7.5m before then, 10m before now + chks := chunks(now, then, beforeThen) + + t.Run("count", func(t *testing.T) { + evaluator := setup(chks, now, `sum by (level)(count_over_time({foo="bar"}[5m]))`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + start := (now - fiveMin - fiveMin) // from - step + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, start <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + resultTs[i] = r.SampleVector()[0].T + resultVals[i] = r.SampleVector()[0].F + + require.Equal(t, group, r.SampleVector()[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, now-600000, resultTs[0]) + require.Equal(t, now-fiveMin, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(0), resultVals[1]) + require.Equal(t, float64(5), resultVals[2]) + }) + + t.Run("bytes", func(t *testing.T) { + evaluator := setup(chks, now, `sum by (level)(bytes_over_time({foo="bar"}[5m]))`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + start := (now - fiveMin - fiveMin) // from - step + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, start <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + resultTs[i] = r.SampleVector()[0].T + resultVals[i] = r.SampleVector()[0].F + + require.Equal(t, group, r.SampleVector()[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, now-600000, resultTs[0]) + require.Equal(t, now-fiveMin, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(0), resultVals[1]) + require.Equal(t, float64(8), resultVals[2]) + }) + }) + }) + + t.Run("without grouping", func(t *testing.T) { + t.Run("evenly aligned, non-overlapping timestamps", func(t *testing.T) { + now := int64(1715964275000) + then := now - fiveMin // 1715963975000 -- 5m before n + beforeThen := then - fiveMin // 1715963675000 -- 5m before then + chks := chunks(now, then, beforeThen) + + t.Run("count", func(t *testing.T) { + evaluator := setup(chks, now, `count_over_time({foo="bar"}[5m])`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, then-fiveMin <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + samples := r.SampleVector() + resultTs[i] = samples[0].T + resultVals[i] = samples[0].F + + require.Equal(t, stream, samples[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, beforeThen, resultTs[0]) + require.Equal(t, then, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(2), resultVals[1]) + require.Equal(t, float64(3), resultVals[2]) + }) + + t.Run("bytes", func(t *testing.T) { + evaluator := setup(chks, now, `bytes_over_time({foo="bar"}[5m])`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, then-fiveMin <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + samples := r.SampleVector() + resultTs[i] = samples[0].T + resultVals[i] = samples[0].F + + require.Equal(t, stream, samples[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, beforeThen, resultTs[0]) + require.Equal(t, then, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(3), resultVals[1]) + require.Equal(t, float64(5), resultVals[2]) + }) + }) + + t.Run("evenly aligned, overlapping timestamps", func(t *testing.T) { + now := int64(1715964275000) + then := now - 150000 // 1715964125000 -- 2.5m before now + beforeThen := then - 450000 // 1715963675000 -- 7.5m before then, 10m before now + chks := chunks(now, then, beforeThen) + + t.Run("count", func(t *testing.T) { + evaluator := setup(chks, now, `count_over_time({foo="bar"}[5m])`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + start := (now - fiveMin - fiveMin) // from - step + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, start <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + resultTs[i] = r.SampleVector()[0].T + resultVals[i] = r.SampleVector()[0].F + + require.Equal(t, stream, r.SampleVector()[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, now-600000, resultTs[0]) + require.Equal(t, now-fiveMin, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(0), resultVals[1]) + require.Equal(t, float64(5), resultVals[2]) + }) + + t.Run("bytes", func(t *testing.T) { + evaluator := setup(chks, now, `bytes_over_time({foo="bar"}[5m])`) + + resultTs := make([]int64, 3) + resultVals := make([]float64, 3) + + start := (now - fiveMin - fiveMin) // from - step + for i := 0; i < 3; i++ { + ok, ts, r := evaluator.Next() + require.True(t, ok) + require.True(t, start <= ts && ts <= now) + + // The []promqlSample in the Vector is reused, so we need to copy the value and timestamp. + resultTs[i] = r.SampleVector()[0].T + resultVals[i] = r.SampleVector()[0].F + + require.Equal(t, stream, r.SampleVector()[0].Metric) + } + + ok, _, _ := evaluator.Next() + require.False(t, ok) + + require.Equal(t, now-600000, resultTs[0]) + require.Equal(t, now-fiveMin, resultTs[1]) + require.Equal(t, now, resultTs[2]) + + require.Equal(t, float64(1), resultVals[0]) + require.Equal(t, float64(0), resultVals[1]) + require.Equal(t, float64(8), resultVals[2]) + }) + }) + }) +} diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index f3aad280250db..24c9e21c96f8a 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -2,15 +2,21 @@ package pattern import ( "context" + "math" "sync" "time" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" ) type stream struct { @@ -20,6 +26,9 @@ type stream struct { labelHash uint64 patterns *drain.Drain mtx sync.Mutex + metrics *metric.Chunks + + evaluator metric.SampleEvaluatorFactory lastTs int64 } @@ -29,6 +38,7 @@ func newStream( labels labels.Labels, metrics *ingesterMetrics, ) (*stream, error) { + chunks := metric.NewChunks(labels) return &stream{ fp: fp, labels: labels, @@ -38,6 +48,8 @@ func newStream( PatternsEvictedTotal: metrics.patternsDiscardedTotal, PatternsDetectedTotal: metrics.patternsDetectedTotal, }), + metrics: chunks, + evaluator: metric.NewDefaultEvaluatorFactory(chunks), }, nil } @@ -48,13 +60,20 @@ func (s *stream) Push( s.mtx.Lock() defer s.mtx.Unlock() + bytes := uint64(0) + count := uint64(len(entries)) for _, entry := range entries { if entry.Timestamp.UnixNano() < s.lastTs { continue } + + bytes += uint64(len(entry.Line)) + s.lastTs = entry.Timestamp.UnixNano() s.patterns.Train(entry.Line, entry.Timestamp.UnixNano()) } + + s.metrics.Observe(bytes, count, model.TimeFromUnixNano(s.lastTs)) return nil } @@ -75,6 +94,142 @@ func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (it return iter.NewMerge(iters...), nil } +// TODO(twhitney): duplication between bytes and count iterators +func (s *stream) BytesIterator( + ctx context.Context, + expr syntax.SampleExpr, + from, through, step model.Time, +) (iter.Iterator, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + stepEvaluator, err := s.evaluator.NewStepEvaluator( + ctx, + s.evaluator, + expr, + metric.Bytes, + from, + through, + step, + ) + if err != nil { + return nil, err + } + + next, ts, r := stepEvaluator.Next() + if stepEvaluator.Error() != nil { + return nil, stepEvaluator.Error() + } + + // TODO(twhitney): actually get max series from limits + // this is only 1 series since we're already on a stream + // this this limit needs to also be enforced higher up + maxSeries := 1000 + series, err := s.JoinSampleVector( + next, + ts, + r, + stepEvaluator, + maxSeries, + from, through, step) + if err != nil { + return nil, err + } + + return metric.NewSeriesToSampleIterator(series), nil +} + +func (s *stream) JoinSampleVector( + next bool, + ts int64, + r logql.StepResult, + stepEvaluator logql.StepEvaluator, + maxSeries int, + from, through, step model.Time, +) (*promql.Series, error) { + stepCount := int(math.Ceil(float64(through.Sub(from).Nanoseconds()) / float64(step.UnixNano()))) + if stepCount <= 0 { + stepCount = 1 + } + + series := &promql.Series{ + Metric: s.labels, + Floats: make([]promql.FPoint, 0, stepCount), + } + + vec := promql.Vector{} + if next { + vec = r.SampleVector() + } + + // fail fast for the first step or instant query + if len(vec) > maxSeries { + return nil, logqlmodel.NewSeriesLimitError(maxSeries) + } + + for next { + vec = r.SampleVector() + for _, p := range vec { + series.Floats = append(series.Floats, promql.FPoint{ + T: ts, + F: p.F, + }) + } + + next, ts, r = stepEvaluator.Next() + if stepEvaluator.Error() != nil { + return nil, stepEvaluator.Error() + } + } + + return series, stepEvaluator.Error() +} + +// TODO(twhitney): duplication between bytes and count iterators +func (s *stream) CountIterator( + ctx context.Context, + expr syntax.SampleExpr, + from, through, step model.Time, +) (iter.Iterator, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + stepEvaluator, err := s.evaluator.NewStepEvaluator( + ctx, + s.evaluator, + expr, + metric.Count, + from, + through, + step, + ) + if err != nil { + return nil, err + } + + next, ts, r := stepEvaluator.Next() + if stepEvaluator.Error() != nil { + return nil, stepEvaluator.Error() + } + + // TODO(twhitney): actually get max series from limits + // this is only 1 series since we're already on a stream + // this this limit needs to also be enforced higher up + maxSeries := 1000 + series, err := s.JoinSampleVector( + next, + ts, + r, + stepEvaluator, + maxSeries, + from, through, step) + if err != nil { + return nil, err + } + + return metric.NewSeriesToSampleIterator(series), nil +} + func (s *stream) prune(olderThan time.Duration) bool { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index f2218816b1113..ab0564af99951 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -36,7 +36,7 @@ func TestAddStream(t *testing.T) { require.NoError(t, err) it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second)) require.NoError(t, err) - res, err := iter.ReadAll(it) + res, err := iter.ReadAllWithPatterns(it) require.NoError(t, err) require.Equal(t, 1, len(res.Series)) require.Equal(t, int64(2), res.Series[0].Samples[0].Value) @@ -70,7 +70,7 @@ func TestPruneStream(t *testing.T) { require.Equal(t, false, stream.prune(time.Hour)) it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second)) require.NoError(t, err) - res, err := iter.ReadAll(it) + res, err := iter.ReadAllWithPatterns(it) require.NoError(t, err) require.Equal(t, 1, len(res.Series)) require.Equal(t, int64(1), res.Series[0].Samples[0].Value) diff --git a/pkg/util/marshal/marshal.go b/pkg/util/marshal/marshal.go index 1a4d6701b1b18..609c7ede41d0a 100644 --- a/pkg/util/marshal/marshal.go +++ b/pkg/util/marshal/marshal.go @@ -202,8 +202,14 @@ func WriteQueryPatternsResponseJSON(r *logproto.QueryPatternsResponse, w io.Writ if len(r.Series) > 0 { for i, series := range r.Series { s.WriteObjectStart() - s.WriteObjectField("pattern") - s.WriteStringWithHTMLEscaped(series.Pattern) + if pattern := series.GetPattern(); pattern != "" { + s.WriteObjectField("pattern") + s.WriteStringWithHTMLEscaped(pattern) + } + if labels := series.GetLabels(); labels != "" { + s.WriteObjectField("labels") + s.WriteStringWithHTMLEscaped(labels) + } s.WriteMore() s.WriteObjectField("samples") s.WriteArrayStart() diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index c749677f77026..4688388744c9d 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -1077,13 +1077,11 @@ func Test_WriteQueryPatternsResponseJSON(t *testing.T) { { &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{ - { - Pattern: "foo <*> bar", - Samples: []*logproto.PatternSample{ - {Timestamp: model.TimeFromUnix(1), Value: 1}, - {Timestamp: model.TimeFromUnix(2), Value: 2}, - }, + logproto.NewPatternSeriesWithPattern("foo <*> bar", []*logproto.PatternSample{ + {Timestamp: model.TimeFromUnix(1), Value: 1}, + {Timestamp: model.TimeFromUnix(2), Value: 2}, }, + ), }, }, `{"status":"success","data":[{"pattern":"foo <*> bar","samples":[[1,1],[2,2]]}]}`, @@ -1091,20 +1089,17 @@ func Test_WriteQueryPatternsResponseJSON(t *testing.T) { { &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{ - { - Pattern: "foo <*> bar", - Samples: []*logproto.PatternSample{ - {Timestamp: model.TimeFromUnix(1), Value: 1}, - {Timestamp: model.TimeFromUnix(2), Value: 2}, - }, + logproto.NewPatternSeriesWithPattern("foo <*> bar", []*logproto.PatternSample{ + {Timestamp: model.TimeFromUnix(1), Value: 1}, + {Timestamp: model.TimeFromUnix(2), Value: 2}, }, - { - Pattern: "foo <*> buzz", - Samples: []*logproto.PatternSample{ + ), + logproto.NewPatternSeriesWithPattern("foo <*> buzz", + []*logproto.PatternSample{ {Timestamp: model.TimeFromUnix(3), Value: 1}, {Timestamp: model.TimeFromUnix(3), Value: 2}, }, - }, + ), }, }, `{"status":"success","data":[{"pattern":"foo <*> bar","samples":[[1,1],[2,2]]},{"pattern":"foo <*> buzz","samples":[[3,1],[3,2]]}]}`, @@ -1112,17 +1107,58 @@ func Test_WriteQueryPatternsResponseJSON(t *testing.T) { { &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{ - { - Pattern: "foo <*> bar", - Samples: []*logproto.PatternSample{}, + logproto.NewPatternSeriesWithPattern("foo <*> bar", + []*logproto.PatternSample{}, + ), + logproto.NewPatternSeriesWithPattern("foo <*> buzz", + []*logproto.PatternSample{}, + ), + }, + }, + `{"status":"success","data":[{"pattern":"foo <*> bar","samples":[]},{"pattern":"foo <*> buzz","samples":[]}]}`, + }, + { + &logproto.QueryPatternsResponse{ + Series: []*logproto.PatternSeries{ + logproto.NewPatternSeriesWithLabels(`{foo="bar"}`, []*logproto.PatternSample{ + {Timestamp: model.TimeFromUnix(1), Value: 1}, + {Timestamp: model.TimeFromUnix(2), Value: 2}, }, - { - Pattern: "foo <*> buzz", - Samples: []*logproto.PatternSample{}, + ), + }, + }, + `{"status":"success","data":[{"labels":"{foo=\"bar\"}","samples":[[1,1],[2,2]]}]}`, + }, + { + &logproto.QueryPatternsResponse{ + Series: []*logproto.PatternSeries{ + logproto.NewPatternSeriesWithLabels(`{foo="bar"}`, []*logproto.PatternSample{ + {Timestamp: model.TimeFromUnix(1), Value: 1}, + {Timestamp: model.TimeFromUnix(2), Value: 2}, }, + ), + logproto.NewPatternSeriesWithLabels(`{foo="buzz"}`, + []*logproto.PatternSample{ + {Timestamp: model.TimeFromUnix(3), Value: 1}, + {Timestamp: model.TimeFromUnix(3), Value: 2}, + }, + ), }, }, - `{"status":"success","data":[{"pattern":"foo <*> bar","samples":[]},{"pattern":"foo <*> buzz","samples":[]}]}`, + `{"status":"success","data":[{"labels":"{foo=\"bar\"}","samples":[[1,1],[2,2]]},{"labels":"{foo=\"buzz\"}","samples":[[3,1],[3,2]]}]}`, + }, + { + &logproto.QueryPatternsResponse{ + Series: []*logproto.PatternSeries{ + logproto.NewPatternSeriesWithLabels(`{foo="bar"}`, + []*logproto.PatternSample{}, + ), + logproto.NewPatternSeriesWithPattern(`{foo="buzz"}`, + []*logproto.PatternSample{}, + ), + }, + }, + `{"status":"success","data":[{"labels":"{foo=\"bar\"}","samples":[]},{"pattern":"{foo=\"buzz\"}","samples":[]}]}`, }, } { tc := tc