diff --git a/docs/api.md b/docs/api.md index bb1f049b7ddbf..e317fd12f0436 100644 --- a/docs/api.md +++ b/docs/api.md @@ -11,11 +11,15 @@ The HTTP API includes the following endpoints: - [`GET /loki/api/v1/labels`](#get-lokiapiv1labels) - [`GET /loki/api/v1/label//values`](#get-lokiapiv1labelnamevalues) - [`GET /loki/api/v1/tail`](#get-lokiapiv1tail) +- [`GET /loki/api/v1/series`](#series) +- [`POST /loki/api/v1/series`](#series) - [`POST /loki/api/v1/push`](#post-lokiapiv1push) - [`GET /api/prom/tail`](#get-apipromtail) - [`GET /api/prom/query`](#get-apipromquery) - [`GET /api/prom/label`](#get-apipromlabel) - [`GET /api/prom/label//values`](#get-apipromlabelnamevalues) +- [`GET /api/prom/series`](#series) +- [`POST /api/prom/series`](#series) - [`POST /api/prom/push`](#post-apiprompush) - [`GET /ready`](#get-ready) - [`POST /flush`](#post-flush) @@ -743,3 +747,75 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester. for a list of exported metrics. In microservices mode, the `/metrics` endpoint is exposed by all components. + +## Series + +The Series API is available under the following: +- `GET /loki/api/v1/series` +- `POST /loki/api/v1/series` +- `GET /api/prom/series` +- `POST /api/prom/series` + +This endpoint returns the list of time series that match a certain label set. + +URL query parameters: + +- `match[]=`: Repeated log stream selector argument that selects the streams to return. At least one `match[]` argument must be provided. +- `start=`: Start timestamp. +- `end=`: End timestamp. + +You can URL-encode these parameters directly in the request body by using the POST method and `Content-Type: application/x-www-form-urlencoded` header. This is useful when specifying a large or dynamic number of stream selectors that may breach server-side URL character limits. + +In microservices mode, these endpoints are exposed by the querier. + +### Examples + +``` bash +$ curl -s "http://localhost:3100/loki/api/v1/series" --data-urlencode 'match={container_name=~"prometheus.*", component="server"}' --data-urlencode 'match={app="loki"}' | jq '.' +{ + "status": "success", + "data": [ + { + "container_name": "loki", + "app": "loki", + "stream": "stderr", + "filename": "/var/log/pods/default_loki-stack-0_50835643-1df0-11ea-ba79-025000000001/loki/0.log", + "name": "loki", + "job": "default/loki", + "controller_revision_hash": "loki-stack-757479754d", + "statefulset_kubernetes_io_pod_name": "loki-stack-0", + "release": "loki-stack", + "namespace": "default", + "instance": "loki-stack-0" + }, + { + "chart": "prometheus-9.3.3", + "container_name": "prometheus-server-configmap-reload", + "filename": "/var/log/pods/default_loki-stack-prometheus-server-696cc9ddff-87lmq_507b1db4-1df0-11ea-ba79-025000000001/prometheus-server-configmap-reload/0.log", + "instance": "loki-stack-prometheus-server-696cc9ddff-87lmq", + "pod_template_hash": "696cc9ddff", + "app": "prometheus", + "component": "server", + "heritage": "Tiller", + "job": "default/prometheus", + "namespace": "default", + "release": "loki-stack", + "stream": "stderr" + }, + { + "app": "prometheus", + "component": "server", + "filename": "/var/log/pods/default_loki-stack-prometheus-server-696cc9ddff-87lmq_507b1db4-1df0-11ea-ba79-025000000001/prometheus-server/0.log", + "release": "loki-stack", + "namespace": "default", + "pod_template_hash": "696cc9ddff", + "stream": "stderr", + "chart": "prometheus-9.3.3", + "container_name": "prometheus-server", + "heritage": "Tiller", + "instance": "loki-stack-prometheus-server-696cc9ddff-87lmq", + "job": "default/prometheus" + } + ] +} +``` diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 88cf9166164ea..c557628df9e30 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -236,6 +236,17 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp return instance.Label(ctx, req) } +// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers +func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + instanceID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + instance := i.getOrCreateInstance(instanceID) + return instance.Series(ctx, req) +} + // Check implements grpc_health_v1.HealthCheck. func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 620a79b80c5a3..0dd1b7ee90372 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -99,6 +99,91 @@ func TestIngester(t *testing.T) { require.Len(t, result.resps, 1) require.Len(t, result.resps[0].Streams, 1) require.Equal(t, `{bar="baz2", foo="bar"}`, result.resps[0].Streams[0].Labels) + + // Series + + // empty matchers + _, err = i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + }) + require.Error(t, err) + + // wrong matchers fmt + _, err = i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + Groups: []string{`{a="b`}, + }) + require.Error(t, err) + + // no selectors + _, err = i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + Groups: []string{`{foo="bar"}`, `{}`}, + }) + require.Error(t, err) + + // foo=bar + resp, err := i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + Groups: []string{`{foo="bar"}`}, + }) + require.Nil(t, err) + require.ElementsMatch(t, []logproto.SeriesIdentifier{ + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz1", + }, + }, + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz2", + }, + }, + }, resp.GetSeries()) + + // foo=bar, bar=~"baz[2-9]" + resp, err = i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + Groups: []string{`{foo="bar", bar=~"baz[2-9]"}`}, + }) + require.Nil(t, err) + require.ElementsMatch(t, []logproto.SeriesIdentifier{ + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz2", + }, + }, + }, resp.GetSeries()) + + // foo=bar, bar=~"baz[2-9]" in different groups should OR the results + resp, err = i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + Groups: []string{`{foo="bar"}`, `{bar=~"baz[2-9]"}`}, + }) + require.Nil(t, err) + require.ElementsMatch(t, []logproto.SeriesIdentifier{ + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz1", + }, + }, + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz2", + }, + }, + }, resp.GetSeries()) } func TestIngesterStreamLimitExceeded(t *testing.T) { diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 8296faed74718..0e844ab660113 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/util" @@ -190,7 +191,20 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie if err != nil { return err } - iters, err := i.lookupStreams(queryServer.Context(), req, expr.Matchers(), filter) + + var iters []iter.EntryIterator + + err = i.forMatchingStreams( + expr.Matchers(), + func(stream *stream) error { + iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter) + if err != nil { + return err + } + iters = append(iters, iter) + return nil + }, + ) if err != nil { return err } @@ -221,32 +235,69 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro }, nil } -func (i *instance) lookupStreams(ctx context.Context, req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) { +func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + groups, err := loghttp.Match(req.GetGroups()) + if err != nil { + return nil, err + } + + dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) + for _, matchers := range groups { + err = i.forMatchingStreams(matchers, func(stream *stream) error { + // exit early when this stream was added by an earlier group + key := stream.labels.Hash() + if _, found := dedupedSeries[key]; found { + return nil + } + + dedupedSeries[key] = logproto.SeriesIdentifier{ + Labels: stream.labels.Map(), + } + return nil + }) + + if err != nil { + return nil, err + } + } + series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries)) + for _, v := range dedupedSeries { + series = append(series, v) + + } + return &logproto.SeriesResponse{Series: series}, nil +} + +// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc). +// It uses a function in order to enable generic stream acces without accidentally leaking streams under the mutex. +func (i *instance) forMatchingStreams( + matchers []*labels.Matcher, + fn func(*stream) error, +) error { i.streamsMtx.RLock() defer i.streamsMtx.RUnlock() filters, matchers := cutil.SplitFiltersAndMatchers(matchers) ids := i.index.Lookup(matchers) - iterators := make([]iter.EntryIterator, 0, len(ids)) outer: for _, streamID := range ids { stream, ok := i.streams[streamID] if !ok { - return nil, ErrStreamMissing + return ErrStreamMissing } for _, filter := range filters { if !filter.Matches(stream.labels.Get(filter.Name)) { continue outer } } - iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter) + + err := fn(stream) if err != nil { - return nil, err + return err } - iterators = append(iterators, iter) } - return iterators, nil + return nil } func (i *instance) addNewTailer(t *tailer) { diff --git a/pkg/loghttp/labels.go b/pkg/loghttp/labels.go index 326cc431adfa3..688b2ebabe109 100644 --- a/pkg/loghttp/labels.go +++ b/pkg/loghttp/labels.go @@ -19,6 +19,11 @@ type LabelResponse struct { // LabelSet is a key/value pair mapping of labels type LabelSet map[string]string +// Map coerces LabelSet into a map[string]string. This is useful for working with adapter types. +func (l LabelSet) Map() map[string]string { + return l +} + // String implements the Stringer interface. It returns a formatted/sorted set of label key/value pairs. func (l LabelSet) String() string { var b bytes.Buffer diff --git a/pkg/loghttp/labels_test.go b/pkg/loghttp/labels_test.go index 00f4c23a4443f..760487c01dfd1 100644 --- a/pkg/loghttp/labels_test.go +++ b/pkg/loghttp/labels_test.go @@ -57,6 +57,22 @@ func TestParseLabelQuery(t *testing.T) { } } +func TestLabelsMap(t *testing.T) { + ls := LabelSet{ + "a": "1", + "b": "2", + } + + require.Equal( + t, + map[string]string{ + "a": "1", + "b": "2", + }, + ls.Map(), + ) +} + func timePtr(t time.Time) *time.Time { return &t } diff --git a/pkg/loghttp/params.go b/pkg/loghttp/params.go index 4f054c9d9baa6..1244a7a623686 100644 --- a/pkg/loghttp/params.go +++ b/pkg/loghttp/params.go @@ -10,8 +10,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" ) const ( @@ -71,6 +73,27 @@ func step(r *http.Request, start, end time.Time) (time.Duration, error) { return 0, errors.Errorf("cannot parse %q to a valid duration", value) } +// Match extracts and parses multiple matcher groups from a slice of strings +func Match(xs []string) ([][]*labels.Matcher, error) { + if len(xs) == 0 { + return nil, errors.New("0 matcher groups supplied") + } + + groups := make([][]*labels.Matcher, 0, len(xs)) + for _, x := range xs { + ms, err := logql.ParseMatchers(x) + if err != nil { + return nil, err + } + if len(ms) == 0 { + return nil, errors.Errorf("0 matchers in group: %s", x) + } + groups = append(groups, ms) + } + + return groups, nil +} + // defaultQueryRangeStep returns the default step used in the query range API, // which is dinamically calculated based on the time range func defaultQueryRangeStep(start time.Time, end time.Time) int { diff --git a/pkg/loghttp/params_test.go b/pkg/loghttp/params_test.go index 59fa6e4809fce..8343668be6284 100644 --- a/pkg/loghttp/params_test.go +++ b/pkg/loghttp/params_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -167,3 +168,62 @@ func Test_parseTimestamp(t *testing.T) { }) } } + +func Test_match(t *testing.T) { + + tests := []struct { + name string + input []string + want [][]*labels.Matcher + wantErr bool + }{ + {"malformed", []string{`{a="1`}, nil, true}, + {"errors on nil input", nil, nil, true}, + { + "single", + []string{`{a="1"}`}, + [][]*labels.Matcher{ + {mustMatcher(labels.MatchEqual, "a", "1")}, + }, + false, + }, + { + "multiple groups", + []string{`{a="1"}`, `{b="2", c=~"3", d!="4"}`}, + [][]*labels.Matcher{ + {mustMatcher(labels.MatchEqual, "a", "1")}, + { + mustMatcher(labels.MatchEqual, "b", "2"), + mustMatcher(labels.MatchRegexp, "c", "3"), + mustMatcher(labels.MatchNotEqual, "d", "4"), + }, + }, + false, + }, + { + "errors on empty group", + []string{`{}`}, + nil, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := Match(tt.input) + if tt.wantErr { + require.Error(t, err) + } else { + require.Equal(t, tt.want, got) + } + + }) + } +} + +func mustMatcher(t labels.MatchType, n string, v string) *labels.Matcher { + m, err := labels.NewMatcher(t, n, v) + if err != nil { + panic(err) + } + return m +} diff --git a/pkg/loghttp/series.go b/pkg/loghttp/series.go new file mode 100644 index 0000000000000..abbac0fefafe3 --- /dev/null +++ b/pkg/loghttp/series.go @@ -0,0 +1,35 @@ +package loghttp + +import ( + "net/http" + + "github.com/grafana/loki/pkg/logproto" +) + +type SeriesResponse struct { + Status string `json:"status"` + Data []LabelSet `json:"data"` +} + +func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) { + start, end, err := bounds(r) + if err != nil { + return nil, err + } + + xs := r.Form["match"] + + // ensure matchers are valid before fanning out to ingesters/store as well as returning valuable parsing errors + // instead of 500s + _, err = Match(xs) + if err != nil { + return nil, err + } + + return &logproto.SeriesRequest{ + Start: start, + End: end, + Groups: xs, + }, nil + +} diff --git a/pkg/loghttp/series_test.go b/pkg/loghttp/series_test.go new file mode 100644 index 0000000000000..97b1f00e994ee --- /dev/null +++ b/pkg/loghttp/series_test.go @@ -0,0 +1,73 @@ +package loghttp + +import ( + "net/http" + "net/url" + "testing" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" +) + +func TestParseSeriesQuery(t *testing.T) { + for _, tc := range []struct { + desc string + input *http.Request + shouldErr bool + expected *logproto.SeriesRequest + }{ + { + "no match", + withForm(url.Values{}), + true, + nil, + }, + { + "malformed", + withForm(url.Values{ + "match": []string{`{a="}`}, + }), + true, + nil, + }, + { + "multiple matches", + withForm(url.Values{ + "start": []string{"1000"}, + "end": []string{"2000"}, + "match": []string{`{a="1"}`, `{b="2", c=~"3", d!="4"}`}, + }), + false, + mkSeriesRequest(t, "1000", "2000", []string{`{a="1"}`, `{b="2", c=~"3", d!="4"}`}), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + out, err := ParseSeriesQuery(tc.input) + if tc.shouldErr { + require.Error(t, err) + } else { + require.Nil(t, err) + require.Equal(t, tc.expected, out) + } + }) + } +} + +func withForm(form url.Values) *http.Request { + return &http.Request{Form: form} +} + +func mkSeriesRequest(t *testing.T, from, to string, matches []string) *logproto.SeriesRequest { + start, end, err := bounds(withForm(url.Values{ + "start": []string{from}, + "end": []string{to}, + })) + require.Nil(t, err) + + require.Nil(t, err) + return &logproto.SeriesRequest{ + Start: start, + End: end, + Groups: matches, + } +} diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index 85dbc32717cc7..b33264c453629 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -9,6 +9,7 @@ import ( fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" _ "github.com/gogo/protobuf/types" github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" grpc "google.golang.org/grpc" @@ -582,6 +583,151 @@ func (m *TailResponse) GetDroppedStreams() []*DroppedStream { return nil } +type SeriesRequest struct { + Start time.Time `protobuf:"bytes,1,opt,name=start,proto3,stdtime" json:"start"` + End time.Time `protobuf:"bytes,2,opt,name=end,proto3,stdtime" json:"end"` + Groups []string `protobuf:"bytes,3,rep,name=groups,proto3" json:"groups,omitempty"` +} + +func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } +func (*SeriesRequest) ProtoMessage() {} +func (*SeriesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{10} +} +func (m *SeriesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeriesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeriesRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeriesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeriesRequest.Merge(m, src) +} +func (m *SeriesRequest) XXX_Size() int { + return m.Size() +} +func (m *SeriesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SeriesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SeriesRequest proto.InternalMessageInfo + +func (m *SeriesRequest) GetStart() time.Time { + if m != nil { + return m.Start + } + return time.Time{} +} + +func (m *SeriesRequest) GetEnd() time.Time { + if m != nil { + return m.End + } + return time.Time{} +} + +func (m *SeriesRequest) GetGroups() []string { + if m != nil { + return m.Groups + } + return nil +} + +type SeriesResponse struct { + Series []SeriesIdentifier `protobuf:"bytes,1,rep,name=series,proto3" json:"series"` +} + +func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } +func (*SeriesResponse) ProtoMessage() {} +func (*SeriesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{11} +} +func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeriesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeriesResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeriesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeriesResponse.Merge(m, src) +} +func (m *SeriesResponse) XXX_Size() int { + return m.Size() +} +func (m *SeriesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SeriesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SeriesResponse proto.InternalMessageInfo + +func (m *SeriesResponse) GetSeries() []SeriesIdentifier { + if m != nil { + return m.Series + } + return nil +} + +type SeriesIdentifier struct { + Labels map[string]string `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (m *SeriesIdentifier) Reset() { *m = SeriesIdentifier{} } +func (*SeriesIdentifier) ProtoMessage() {} +func (*SeriesIdentifier) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{12} +} +func (m *SeriesIdentifier) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeriesIdentifier) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeriesIdentifier.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeriesIdentifier) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeriesIdentifier.Merge(m, src) +} +func (m *SeriesIdentifier) XXX_Size() int { + return m.Size() +} +func (m *SeriesIdentifier) XXX_DiscardUnknown() { + xxx_messageInfo_SeriesIdentifier.DiscardUnknown(m) +} + +var xxx_messageInfo_SeriesIdentifier proto.InternalMessageInfo + +func (m *SeriesIdentifier) GetLabels() map[string]string { + if m != nil { + return m.Labels + } + return nil +} + type DroppedStream struct { From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` @@ -591,7 +737,7 @@ type DroppedStream struct { func (m *DroppedStream) Reset() { *m = DroppedStream{} } func (*DroppedStream) ProtoMessage() {} func (*DroppedStream) Descriptor() ([]byte, []int) { - return fileDescriptor_7a8976f235a02f79, []int{10} + return fileDescriptor_7a8976f235a02f79, []int{13} } func (m *DroppedStream) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -651,7 +797,7 @@ type TimeSeriesChunk struct { func (m *TimeSeriesChunk) Reset() { *m = TimeSeriesChunk{} } func (*TimeSeriesChunk) ProtoMessage() {} func (*TimeSeriesChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_7a8976f235a02f79, []int{11} + return fileDescriptor_7a8976f235a02f79, []int{14} } func (m *TimeSeriesChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -716,7 +862,7 @@ type LabelPair struct { func (m *LabelPair) Reset() { *m = LabelPair{} } func (*LabelPair) ProtoMessage() {} func (*LabelPair) Descriptor() ([]byte, []int) { - return fileDescriptor_7a8976f235a02f79, []int{12} + return fileDescriptor_7a8976f235a02f79, []int{15} } func (m *LabelPair) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -766,7 +912,7 @@ type Chunk struct { func (m *Chunk) Reset() { *m = Chunk{} } func (*Chunk) ProtoMessage() {} func (*Chunk) Descriptor() ([]byte, []int) { - return fileDescriptor_7a8976f235a02f79, []int{13} + return fileDescriptor_7a8976f235a02f79, []int{16} } func (m *Chunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -808,7 +954,7 @@ type TransferChunksResponse struct { func (m *TransferChunksResponse) Reset() { *m = TransferChunksResponse{} } func (*TransferChunksResponse) ProtoMessage() {} func (*TransferChunksResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_7a8976f235a02f79, []int{14} + return fileDescriptor_7a8976f235a02f79, []int{17} } func (m *TransferChunksResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -849,6 +995,10 @@ func init() { proto.RegisterType((*Entry)(nil), "logproto.Entry") proto.RegisterType((*TailRequest)(nil), "logproto.TailRequest") proto.RegisterType((*TailResponse)(nil), "logproto.TailResponse") + proto.RegisterType((*SeriesRequest)(nil), "logproto.SeriesRequest") + proto.RegisterType((*SeriesResponse)(nil), "logproto.SeriesResponse") + proto.RegisterType((*SeriesIdentifier)(nil), "logproto.SeriesIdentifier") + proto.RegisterMapType((map[string]string)(nil), "logproto.SeriesIdentifier.LabelsEntry") proto.RegisterType((*DroppedStream)(nil), "logproto.DroppedStream") proto.RegisterType((*TimeSeriesChunk)(nil), "logproto.TimeSeriesChunk") proto.RegisterType((*LabelPair)(nil), "logproto.LabelPair") @@ -859,65 +1009,72 @@ func init() { func init() { proto.RegisterFile("logproto.proto", fileDescriptor_7a8976f235a02f79) } var fileDescriptor_7a8976f235a02f79 = []byte{ - // 920 bytes of a gzipped FileDescriptorProto + // 1039 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x6f, 0x1b, 0x45, - 0x14, 0xdf, 0xb1, 0xd7, 0xf6, 0xfa, 0xf9, 0x4f, 0xac, 0x21, 0x24, 0xc6, 0xa0, 0xb5, 0xb5, 0x07, - 0x6a, 0x15, 0xe1, 0x80, 0x29, 0x14, 0x0a, 0x12, 0x8a, 0x5b, 0x22, 0x52, 0x90, 0x68, 0x37, 0x91, - 0x38, 0xa1, 0x6a, 0x93, 0x9d, 0x38, 0x2b, 0xd6, 0x3b, 0xee, 0xcc, 0x18, 0x29, 0x37, 0x3e, 0x42, - 0x6f, 0x7c, 0x05, 0xc4, 0x81, 0x8f, 0xc0, 0xb9, 0xc7, 0x1c, 0x7b, 0x0a, 0xc4, 0xb9, 0xa0, 0x48, - 0x48, 0xfd, 0x08, 0x68, 0xfe, 0xec, 0x9f, 0xa4, 0x11, 0x24, 0xbd, 0xd8, 0xf3, 0x66, 0xde, 0x9b, - 0x79, 0xbf, 0xdf, 0xfb, 0xbd, 0xb7, 0xd0, 0x8e, 0xe9, 0x74, 0xce, 0xa8, 0xa0, 0x23, 0xf5, 0x8b, - 0x9d, 0xd4, 0xee, 0xf5, 0xa7, 0x94, 0x4e, 0x63, 0xb2, 0xa1, 0xac, 0xbd, 0xc5, 0xc1, 0x86, 0x88, - 0x66, 0x84, 0x8b, 0x60, 0x36, 0xd7, 0xae, 0xbd, 0xf7, 0xa7, 0x91, 0x38, 0x5c, 0xec, 0x8d, 0xf6, - 0xe9, 0x6c, 0x63, 0x4a, 0xa7, 0x34, 0xf7, 0x94, 0x96, 0x32, 0xd4, 0x4a, 0xbb, 0x7b, 0x5b, 0xd0, - 0x78, 0xb4, 0xe0, 0x87, 0x3e, 0x79, 0xba, 0x20, 0x5c, 0xe0, 0xbb, 0x50, 0xe3, 0x82, 0x91, 0x60, - 0xc6, 0xbb, 0x68, 0x50, 0x1e, 0x36, 0xc6, 0x9d, 0x51, 0x96, 0xca, 0x8e, 0x3a, 0x98, 0x34, 0xce, - 0x4f, 0xfa, 0xa9, 0x93, 0x9f, 0x2e, 0xbc, 0x36, 0x34, 0xf5, 0x3d, 0x7c, 0x4e, 0x13, 0x4e, 0xbc, - 0x7f, 0x10, 0x34, 0x1f, 0x2f, 0x08, 0x3b, 0x4a, 0x6f, 0xee, 0x81, 0xc3, 0x49, 0x4c, 0xf6, 0x05, - 0x65, 0x5d, 0x34, 0x40, 0xc3, 0xba, 0x9f, 0xd9, 0x78, 0x15, 0x2a, 0x71, 0x34, 0x8b, 0x44, 0xb7, - 0x34, 0x40, 0xc3, 0x96, 0xaf, 0x0d, 0x7c, 0x0f, 0x2a, 0x5c, 0x04, 0x4c, 0x74, 0xcb, 0x03, 0x34, - 0x6c, 0x8c, 0x7b, 0x23, 0x0d, 0x7d, 0x94, 0x02, 0x1a, 0xed, 0xa6, 0xd0, 0x27, 0xce, 0xf3, 0x93, - 0xbe, 0xf5, 0xec, 0xcf, 0x3e, 0xf2, 0x75, 0x08, 0xfe, 0x04, 0xca, 0x24, 0x09, 0xbb, 0xf6, 0x0d, - 0x22, 0x65, 0x00, 0xfe, 0x10, 0xea, 0x61, 0xc4, 0xc8, 0xbe, 0x88, 0x68, 0xd2, 0xad, 0x0c, 0xd0, - 0xb0, 0x3d, 0x7e, 0x23, 0x67, 0xe0, 0x41, 0x7a, 0xe4, 0xe7, 0x5e, 0x0f, 0x6d, 0xa7, 0xda, 0xa9, - 0x79, 0x9f, 0x43, 0xcb, 0xc0, 0xd5, 0x04, 0xe0, 0xdb, 0xff, 0xcb, 0x64, 0x4e, 0xde, 0xef, 0x08, - 0x9a, 0xdf, 0x06, 0x7b, 0x24, 0x4e, 0xc9, 0xc2, 0x60, 0x27, 0xc1, 0x8c, 0x18, 0xa2, 0xd4, 0x1a, - 0xaf, 0x41, 0xf5, 0xa7, 0x20, 0x5e, 0x10, 0xae, 0x58, 0x72, 0x7c, 0x63, 0xdd, 0x94, 0x26, 0xf4, - 0xda, 0x34, 0xa1, 0x8c, 0x26, 0xef, 0x16, 0xb4, 0x4c, 0xbe, 0x06, 0x6d, 0x9e, 0x9c, 0x04, 0x5b, - 0x4f, 0x93, 0xf3, 0x0e, 0xa1, 0xaa, 0xc1, 0x62, 0x0f, 0xaa, 0xb1, 0x0c, 0xe1, 0x1a, 0xd4, 0x04, - 0xce, 0x4f, 0xfa, 0x66, 0xc7, 0x37, 0xff, 0xf8, 0x1e, 0xd4, 0x48, 0x22, 0x58, 0xa4, 0x30, 0x4a, - 0xce, 0x56, 0x72, 0xce, 0xbe, 0x4a, 0x04, 0x3b, 0x9a, 0xac, 0xc8, 0x72, 0x49, 0x01, 0x1a, 0x3f, - 0x3f, 0x5d, 0x78, 0x14, 0x2a, 0xca, 0x05, 0x7f, 0x0d, 0xf5, 0xac, 0x27, 0xd4, 0x5b, 0xff, 0x8d, - 0xac, 0x6d, 0x6e, 0x2c, 0x09, 0xae, 0xf0, 0xe5, 0xc1, 0xf8, 0x1d, 0xb0, 0xe3, 0x28, 0x21, 0x8a, - 0xef, 0xfa, 0xc4, 0x39, 0x3f, 0xe9, 0x2b, 0xdb, 0x57, 0xbf, 0xde, 0x2f, 0x08, 0x1a, 0xbb, 0x41, - 0x94, 0xd5, 0x6c, 0x15, 0x2a, 0x4f, 0xa5, 0x02, 0x4c, 0xd1, 0xb4, 0x21, 0x65, 0x1f, 0x92, 0x38, - 0x38, 0xda, 0xa2, 0x4c, 0x15, 0xa8, 0xe5, 0x67, 0x76, 0x2e, 0x7b, 0xfb, 0x4a, 0xd9, 0x57, 0x6e, - 0x2c, 0xfb, 0x87, 0xb6, 0x53, 0xea, 0x94, 0xbd, 0x23, 0x68, 0xea, 0xc4, 0x4c, 0x71, 0x86, 0x50, - 0xd5, 0x4a, 0x33, 0x74, 0xbc, 0xaa, 0x44, 0x73, 0x8e, 0xbf, 0x84, 0x76, 0xc8, 0xe8, 0x7c, 0x4e, - 0xc2, 0x1d, 0xa3, 0x5d, 0x5d, 0x87, 0xf5, 0x42, 0x0f, 0x14, 0xcf, 0xfd, 0x4b, 0xee, 0x92, 0x94, - 0xd6, 0x05, 0x0f, 0xfc, 0x29, 0xd8, 0x07, 0x8c, 0xce, 0xae, 0x51, 0x89, 0x1c, 0x8d, 0x8a, 0xc0, - 0x77, 0xa0, 0x24, 0xa8, 0x22, 0xff, 0xba, 0x71, 0x25, 0x41, 0xa5, 0x12, 0x8d, 0xce, 0xca, 0xaa, - 0x0e, 0xc6, 0xf2, 0x7e, 0x43, 0xb0, 0x22, 0x63, 0x76, 0x88, 0x94, 0xcb, 0xfd, 0xc3, 0x45, 0xf2, - 0x23, 0x1e, 0x42, 0x47, 0xbe, 0xf4, 0x24, 0x4a, 0xa6, 0x84, 0x0b, 0xc2, 0x9e, 0x44, 0xa1, 0xa9, - 0x5e, 0x5b, 0xee, 0x6f, 0x9b, 0xed, 0xed, 0x10, 0xaf, 0x43, 0x6d, 0xc1, 0xb5, 0x43, 0x49, 0x5f, - 0x2b, 0xcd, 0xed, 0x10, 0xbf, 0x57, 0x78, 0x4e, 0x32, 0x55, 0x98, 0x16, 0xaa, 0x43, 0x1e, 0x05, - 0x11, 0xcb, 0xf4, 0x7d, 0x0b, 0xaa, 0xfb, 0xf2, 0x61, 0xde, 0xb5, 0x2f, 0xcb, 0x5b, 0x25, 0xe4, - 0x9b, 0x63, 0xef, 0x63, 0xa8, 0x67, 0xd1, 0x57, 0x0e, 0x83, 0x55, 0xa8, 0xa8, 0x0e, 0x33, 0xd9, - 0x68, 0xc3, 0x7b, 0x1b, 0x2a, 0x1a, 0x18, 0x06, 0x3b, 0x0c, 0x44, 0xa0, 0x42, 0x9a, 0xbe, 0x5a, - 0x7b, 0x5d, 0x58, 0xdb, 0x65, 0x41, 0xc2, 0x0f, 0x08, 0x53, 0x4e, 0x3c, 0xd5, 0xc7, 0xed, 0x77, - 0xa1, 0x9e, 0x4d, 0x36, 0xdc, 0x80, 0xda, 0xd6, 0x77, 0xfe, 0xf7, 0x9b, 0xfe, 0x83, 0x8e, 0x85, - 0x9b, 0xe0, 0x4c, 0x36, 0xef, 0x7f, 0xa3, 0x2c, 0x34, 0xde, 0x84, 0xaa, 0x9c, 0xf1, 0x84, 0xe1, - 0xbb, 0x60, 0xcb, 0x15, 0x7e, 0x33, 0x07, 0x50, 0xf8, 0x8a, 0xf4, 0xd6, 0x2e, 0x6f, 0x9b, 0x8f, - 0x82, 0x35, 0xfe, 0x03, 0x41, 0x4d, 0xce, 0xc9, 0x88, 0x30, 0xfc, 0x05, 0x54, 0xd4, 0xc8, 0xc4, - 0x05, 0xf7, 0xe2, 0x27, 0xa3, 0xb7, 0xfe, 0xca, 0x7e, 0x7a, 0xcf, 0x07, 0x48, 0xb6, 0x89, 0xa2, - 0xa8, 0x18, 0x5d, 0x9c, 0xa1, 0xc5, 0xe8, 0x0b, 0xb3, 0xca, 0xb3, 0xf0, 0x67, 0x60, 0xcb, 0x06, - 0x29, 0xa6, 0x5f, 0xe8, 0xe4, 0x62, 0xfa, 0xc5, 0x3e, 0x92, 0xcf, 0x8e, 0x7f, 0x00, 0x27, 0x95, - 0x05, 0x7e, 0x0c, 0xed, 0x8b, 0x8c, 0xe2, 0xb7, 0x0a, 0x91, 0x17, 0xb5, 0xd6, 0x1b, 0x14, 0x8e, - 0xae, 0x2c, 0x83, 0x67, 0x0d, 0xd1, 0xe4, 0xce, 0xf1, 0xa9, 0x6b, 0xbd, 0x38, 0x75, 0xad, 0x97, - 0xa7, 0x2e, 0xfa, 0x79, 0xe9, 0xa2, 0x5f, 0x97, 0x2e, 0x7a, 0xbe, 0x74, 0xd1, 0xf1, 0xd2, 0x45, - 0x7f, 0x2d, 0x5d, 0xf4, 0xf7, 0xd2, 0xb5, 0x5e, 0x2e, 0x5d, 0xf4, 0xec, 0xcc, 0xb5, 0x8e, 0xcf, - 0x5c, 0xeb, 0xc5, 0x99, 0x6b, 0xed, 0x55, 0xd5, 0xbd, 0x1f, 0xfd, 0x1b, 0x00, 0x00, 0xff, 0xff, - 0x95, 0xdc, 0x79, 0x4b, 0x37, 0x08, 0x00, 0x00, + 0x14, 0xdf, 0xb1, 0xd7, 0x6b, 0xfb, 0xf9, 0x4f, 0xac, 0xa1, 0x24, 0xc6, 0xa0, 0xb5, 0x35, 0x87, + 0xd6, 0x2a, 0xc2, 0x81, 0x50, 0x68, 0x1a, 0xfe, 0x29, 0x6e, 0x89, 0x48, 0x40, 0xa2, 0xdd, 0x44, + 0xe2, 0x84, 0xaa, 0x4d, 0x76, 0xe2, 0xac, 0x6a, 0xef, 0xba, 0x3b, 0x63, 0xa4, 0xdc, 0xf8, 0x02, + 0x48, 0xbd, 0x71, 0xe0, 0x0b, 0x20, 0x0e, 0x7c, 0x8e, 0x1e, 0x73, 0xec, 0x29, 0x10, 0xe7, 0x82, + 0x22, 0x21, 0xf5, 0x0b, 0x20, 0xa1, 0xf9, 0xb3, 0xeb, 0x89, 0x13, 0xa8, 0xdc, 0xcb, 0xee, 0xbc, + 0x99, 0xf7, 0x66, 0xde, 0xef, 0xf7, 0x7e, 0x6f, 0x06, 0xea, 0xc3, 0x78, 0x30, 0x4e, 0x62, 0x1e, + 0xf7, 0xe4, 0x17, 0x97, 0x52, 0xbb, 0xd5, 0x1e, 0xc4, 0xf1, 0x60, 0x48, 0x57, 0xa5, 0xb5, 0x3f, + 0x39, 0x5c, 0xe5, 0xe1, 0x88, 0x32, 0xee, 0x8f, 0xc6, 0xca, 0xb5, 0xf5, 0xde, 0x20, 0xe4, 0x47, + 0x93, 0xfd, 0xde, 0x41, 0x3c, 0x5a, 0x1d, 0xc4, 0x83, 0x78, 0xe6, 0x29, 0x2c, 0x69, 0xc8, 0x91, + 0x72, 0x27, 0x5b, 0x50, 0x79, 0x38, 0x61, 0x47, 0x1e, 0x7d, 0x3a, 0xa1, 0x8c, 0xe3, 0xbb, 0x50, + 0x64, 0x3c, 0xa1, 0xfe, 0x88, 0x35, 0x51, 0x27, 0xdf, 0xad, 0xac, 0x35, 0x7a, 0x59, 0x2a, 0xbb, + 0x72, 0xa1, 0x5f, 0xb9, 0x38, 0x6d, 0xa7, 0x4e, 0x5e, 0x3a, 0x20, 0x75, 0xa8, 0xaa, 0x7d, 0xd8, + 0x38, 0x8e, 0x18, 0x25, 0x7f, 0x23, 0xa8, 0x3e, 0x9a, 0xd0, 0xe4, 0x38, 0xdd, 0xb9, 0x05, 0x25, + 0x46, 0x87, 0xf4, 0x80, 0xc7, 0x49, 0x13, 0x75, 0x50, 0xb7, 0xec, 0x65, 0x36, 0xbe, 0x01, 0x85, + 0x61, 0x38, 0x0a, 0x79, 0x33, 0xd7, 0x41, 0xdd, 0x9a, 0xa7, 0x0c, 0xbc, 0x01, 0x05, 0xc6, 0xfd, + 0x84, 0x37, 0xf3, 0x1d, 0xd4, 0xad, 0xac, 0xb5, 0x7a, 0x0a, 0x7a, 0x2f, 0x05, 0xd4, 0xdb, 0x4b, + 0xa1, 0xf7, 0x4b, 0xcf, 0x4f, 0xdb, 0xd6, 0xb3, 0x3f, 0xda, 0xc8, 0x53, 0x21, 0xf8, 0x63, 0xc8, + 0xd3, 0x28, 0x68, 0xda, 0x0b, 0x44, 0x8a, 0x00, 0xfc, 0x01, 0x94, 0x83, 0x30, 0xa1, 0x07, 0x3c, + 0x8c, 0xa3, 0x66, 0xa1, 0x83, 0xba, 0xf5, 0xb5, 0x37, 0x66, 0x0c, 0x3c, 0x48, 0x97, 0xbc, 0x99, + 0xd7, 0x8e, 0x5d, 0x72, 0x1a, 0x45, 0xf2, 0x09, 0xd4, 0x34, 0x5c, 0x45, 0x00, 0xbe, 0xfd, 0x4a, + 0x26, 0x67, 0xe4, 0xfd, 0x8e, 0xa0, 0xfa, 0x8d, 0xbf, 0x4f, 0x87, 0x29, 0x59, 0x18, 0xec, 0xc8, + 0x1f, 0x51, 0x4d, 0x94, 0x1c, 0xe3, 0x65, 0x70, 0x7e, 0xf0, 0x87, 0x13, 0xca, 0x24, 0x4b, 0x25, + 0x4f, 0x5b, 0x8b, 0xd2, 0x84, 0x5e, 0x9b, 0x26, 0x94, 0xd1, 0x44, 0x6e, 0x41, 0x4d, 0xe7, 0xab, + 0xd1, 0xce, 0x92, 0x13, 0x60, 0xcb, 0x69, 0x72, 0xe4, 0x08, 0x1c, 0x05, 0x16, 0x13, 0x70, 0x86, + 0x22, 0x84, 0x29, 0x50, 0x7d, 0xb8, 0x38, 0x6d, 0xeb, 0x19, 0x4f, 0xff, 0xf1, 0x06, 0x14, 0x69, + 0xc4, 0x93, 0x50, 0x62, 0x14, 0x9c, 0x2d, 0xcd, 0x38, 0xfb, 0x32, 0xe2, 0xc9, 0x71, 0x7f, 0x49, + 0x94, 0x4b, 0x08, 0x50, 0xfb, 0x79, 0xe9, 0x80, 0xc4, 0x50, 0x90, 0x2e, 0xf8, 0x2b, 0x28, 0x67, + 0x3d, 0x21, 0xcf, 0xfa, 0x7f, 0x64, 0x75, 0xbd, 0x63, 0x8e, 0x33, 0x89, 0x6f, 0x16, 0x8c, 0xdf, + 0x01, 0x7b, 0x18, 0x46, 0x54, 0xf2, 0x5d, 0xee, 0x97, 0x2e, 0x4e, 0xdb, 0xd2, 0xf6, 0xe4, 0x97, + 0xfc, 0x8c, 0xa0, 0xb2, 0xe7, 0x87, 0x59, 0xcd, 0x6e, 0x40, 0xe1, 0xa9, 0x50, 0x80, 0x2e, 0x9a, + 0x32, 0x84, 0xec, 0x03, 0x3a, 0xf4, 0x8f, 0xb7, 0xe2, 0x44, 0x16, 0xa8, 0xe6, 0x65, 0xf6, 0x4c, + 0xf6, 0xf6, 0xb5, 0xb2, 0x2f, 0x2c, 0x2c, 0xfb, 0x1d, 0xbb, 0x94, 0x6b, 0xe4, 0xc9, 0x31, 0x54, + 0x55, 0x62, 0xba, 0x38, 0x5d, 0x70, 0x94, 0xd2, 0x34, 0x1d, 0x57, 0x95, 0xa8, 0xd7, 0xf1, 0x17, + 0x50, 0x0f, 0x92, 0x78, 0x3c, 0xa6, 0xc1, 0xae, 0xd6, 0xae, 0xaa, 0xc3, 0x8a, 0xd1, 0x03, 0xe6, + 0xba, 0x37, 0xe7, 0x4e, 0x7e, 0x41, 0x50, 0xdb, 0xa5, 0xb2, 0x32, 0x9a, 0x96, 0x0c, 0x0e, 0x7a, + 0xed, 0x2e, 0xce, 0x2d, 0xda, 0xc5, 0xcb, 0xe0, 0x0c, 0x92, 0x78, 0x32, 0x66, 0xcd, 0xbc, 0x52, + 0xa3, 0xb2, 0xc8, 0x0e, 0xd4, 0xd3, 0xe4, 0x34, 0x35, 0xeb, 0xe0, 0x30, 0x39, 0xa3, 0x9b, 0xb4, + 0x65, 0x50, 0x23, 0xe7, 0xb7, 0x03, 0x1a, 0xf1, 0xf0, 0x30, 0xa4, 0x49, 0xdf, 0x16, 0x87, 0x78, + 0xda, 0x9f, 0xfc, 0x84, 0xa0, 0x31, 0xef, 0x82, 0x3f, 0x37, 0x44, 0x2e, 0xb6, 0xbb, 0xf9, 0xdf, + 0xdb, 0xf5, 0x64, 0x03, 0x31, 0xa9, 0xd9, 0xb4, 0x01, 0x5a, 0xf7, 0xa0, 0x62, 0x4c, 0xe3, 0x06, + 0xe4, 0x9f, 0xd0, 0x54, 0x50, 0x62, 0x28, 0x24, 0x23, 0x3b, 0x4b, 0x69, 0xd2, 0x53, 0xc6, 0x46, + 0x6e, 0x1d, 0x09, 0x39, 0xd6, 0x2e, 0xd5, 0x06, 0xaf, 0x83, 0x7d, 0x98, 0xc4, 0xa3, 0x85, 0x88, + 0x97, 0x11, 0xf8, 0x0e, 0xe4, 0x78, 0xbc, 0x10, 0xed, 0x39, 0x1e, 0x0b, 0xd6, 0x35, 0xf8, 0xbc, + 0x4c, 0x4e, 0x5b, 0xe4, 0x37, 0x04, 0x4b, 0x22, 0x46, 0x31, 0x70, 0xff, 0x68, 0x12, 0x3d, 0xc1, + 0x5d, 0x68, 0x88, 0x93, 0x1e, 0x87, 0xd1, 0x80, 0x32, 0x4e, 0x93, 0xc7, 0x61, 0xa0, 0x61, 0xd6, + 0xc5, 0xfc, 0xb6, 0x9e, 0xde, 0x0e, 0xf0, 0x0a, 0x14, 0x27, 0x4c, 0x39, 0x28, 0xcc, 0x8e, 0x30, + 0xb7, 0x03, 0xfc, 0xae, 0x71, 0x9c, 0xe0, 0xda, 0xb8, 0xa7, 0x25, 0x87, 0x0f, 0xfd, 0x30, 0xc9, + 0x6e, 0x96, 0x5b, 0xe0, 0x1c, 0x88, 0x83, 0x59, 0xd3, 0x9e, 0xbf, 0x58, 0x64, 0x42, 0x9e, 0x5e, + 0x26, 0x1f, 0x41, 0x39, 0x8b, 0xbe, 0xf6, 0x1a, 0xbe, 0xb6, 0x02, 0xe4, 0x6d, 0x28, 0x28, 0x60, + 0x18, 0xec, 0xc0, 0xe7, 0xbe, 0x0c, 0xa9, 0x7a, 0x72, 0x4c, 0x9a, 0xb0, 0xbc, 0x97, 0xf8, 0x11, + 0x3b, 0xa4, 0x89, 0x74, 0xca, 0xe4, 0x77, 0xfb, 0x26, 0x94, 0xb3, 0x37, 0x05, 0x57, 0xa0, 0xb8, + 0xf5, 0xad, 0xf7, 0xdd, 0xa6, 0xf7, 0xa0, 0x61, 0xe1, 0x2a, 0x94, 0xfa, 0x9b, 0xf7, 0xbf, 0x96, + 0x16, 0x5a, 0xdb, 0x04, 0x47, 0xbc, 0xae, 0x34, 0xc1, 0x77, 0xc1, 0x16, 0x23, 0xfc, 0xe6, 0x0c, + 0x80, 0xf1, 0x7e, 0xb7, 0x96, 0xe7, 0xa7, 0xf5, 0x73, 0x6c, 0xad, 0xfd, 0x83, 0xa0, 0x28, 0x5e, + 0x28, 0x21, 0xd3, 0x4f, 0xa1, 0x20, 0x1f, 0x2b, 0x6c, 0xb8, 0x9b, 0x8f, 0x75, 0x6b, 0xe5, 0xca, + 0x7c, 0xba, 0xcf, 0xfb, 0x48, 0x74, 0xb4, 0xa4, 0xc8, 0x8c, 0x36, 0x5f, 0x2f, 0x33, 0xfa, 0xd2, + 0x2b, 0x41, 0x2c, 0x7c, 0x0f, 0x6c, 0x71, 0x35, 0x99, 0xe9, 0x1b, 0x77, 0xa8, 0x99, 0xbe, 0x79, + 0x83, 0xc9, 0x63, 0x3f, 0x03, 0x47, 0x29, 0x08, 0xaf, 0xcc, 0x77, 0x55, 0x1a, 0xde, 0xbc, 0xba, + 0x90, 0xe1, 0xff, 0x1e, 0x4a, 0xa9, 0xaa, 0xf0, 0x23, 0xa8, 0x5f, 0x2e, 0x08, 0x7e, 0xcb, 0x38, + 0xf8, 0xb2, 0x54, 0x5b, 0x1d, 0x63, 0xe9, 0xda, 0x2a, 0x12, 0xab, 0x8b, 0xfa, 0x77, 0x4e, 0xce, + 0x5c, 0xeb, 0xc5, 0x99, 0x6b, 0xbd, 0x3c, 0x73, 0xd1, 0x8f, 0x53, 0x17, 0xfd, 0x3a, 0x75, 0xd1, + 0xf3, 0xa9, 0x8b, 0x4e, 0xa6, 0x2e, 0xfa, 0x73, 0xea, 0xa2, 0xbf, 0xa6, 0xae, 0xf5, 0x72, 0xea, + 0xa2, 0x67, 0xe7, 0xae, 0x75, 0x72, 0xee, 0x5a, 0x2f, 0xce, 0x5d, 0x6b, 0xdf, 0x91, 0xfb, 0x7e, + 0xf8, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x61, 0x39, 0x2d, 0x52, 0xf0, 0x09, 0x00, 0x00, } func (x Direction) String() string { @@ -1236,6 +1393,99 @@ func (this *TailResponse) Equal(that interface{}) bool { } return true } +func (this *SeriesRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*SeriesRequest) + if !ok { + that2, ok := that.(SeriesRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Start.Equal(that1.Start) { + return false + } + if !this.End.Equal(that1.End) { + return false + } + if len(this.Groups) != len(that1.Groups) { + return false + } + for i := range this.Groups { + if this.Groups[i] != that1.Groups[i] { + return false + } + } + return true +} +func (this *SeriesResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*SeriesResponse) + if !ok { + that2, ok := that.(SeriesResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Series) != len(that1.Series) { + return false + } + for i := range this.Series { + if !this.Series[i].Equal(&that1.Series[i]) { + return false + } + } + return true +} +func (this *SeriesIdentifier) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*SeriesIdentifier) + if !ok { + that2, ok := that.(SeriesIdentifier) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if this.Labels[i] != that1.Labels[i] { + return false + } + } + return true +} func (this *DroppedStream) Equal(that interface{}) bool { if that == nil { return this == nil @@ -1507,6 +1757,56 @@ func (this *TailResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *SeriesRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&logproto.SeriesRequest{") + s = append(s, "Start: "+fmt.Sprintf("%#v", this.Start)+",\n") + s = append(s, "End: "+fmt.Sprintf("%#v", this.End)+",\n") + s = append(s, "Groups: "+fmt.Sprintf("%#v", this.Groups)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *SeriesResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logproto.SeriesResponse{") + if this.Series != nil { + vs := make([]*SeriesIdentifier, len(this.Series)) + for i := range vs { + vs[i] = &this.Series[i] + } + s = append(s, "Series: "+fmt.Sprintf("%#v", vs)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *SeriesIdentifier) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logproto.SeriesIdentifier{") + keysForLabels := make([]string, 0, len(this.Labels)) + for k, _ := range this.Labels { + keysForLabels = append(keysForLabels, k) + } + github_com_gogo_protobuf_sortkeys.Strings(keysForLabels) + mapStringForLabels := "map[string]string{" + for _, k := range keysForLabels { + mapStringForLabels += fmt.Sprintf("%#v: %#v,", k, this.Labels[k]) + } + mapStringForLabels += "}" + if this.Labels != nil { + s = append(s, "Labels: "+mapStringForLabels+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} func (this *DroppedStream) GoString() string { if this == nil { return "nil" @@ -1662,6 +1962,7 @@ type QuerierClient interface { Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Querier_QueryClient, error) Label(ctx context.Context, in *LabelRequest, opts ...grpc.CallOption) (*LabelResponse, error) Tail(ctx context.Context, in *TailRequest, opts ...grpc.CallOption) (Querier_TailClient, error) + Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (*SeriesResponse, error) } type querierClient struct { @@ -1745,11 +2046,21 @@ func (x *querierTailClient) Recv() (*TailResponse, error) { return m, nil } +func (c *querierClient) Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (*SeriesResponse, error) { + out := new(SeriesResponse) + err := c.cc.Invoke(ctx, "/logproto.Querier/Series", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // QuerierServer is the server API for Querier service. type QuerierServer interface { Query(*QueryRequest, Querier_QueryServer) error Label(context.Context, *LabelRequest) (*LabelResponse, error) Tail(*TailRequest, Querier_TailServer) error + Series(context.Context, *SeriesRequest) (*SeriesResponse, error) } // UnimplementedQuerierServer can be embedded to have forward compatible implementations. @@ -1765,6 +2076,9 @@ func (*UnimplementedQuerierServer) Label(ctx context.Context, req *LabelRequest) func (*UnimplementedQuerierServer) Tail(req *TailRequest, srv Querier_TailServer) error { return status.Errorf(codes.Unimplemented, "method Tail not implemented") } +func (*UnimplementedQuerierServer) Series(ctx context.Context, req *SeriesRequest) (*SeriesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Series not implemented") +} func RegisterQuerierServer(s *grpc.Server, srv QuerierServer) { s.RegisterService(&_Querier_serviceDesc, srv) @@ -1830,6 +2144,24 @@ func (x *querierTailServer) Send(m *TailResponse) error { return x.ServerStream.SendMsg(m) } +func _Querier_Series_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SeriesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QuerierServer).Series(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/logproto.Querier/Series", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QuerierServer).Series(ctx, req.(*SeriesRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Querier_serviceDesc = grpc.ServiceDesc{ ServiceName: "logproto.Querier", HandlerType: (*QuerierServer)(nil), @@ -1838,6 +2170,10 @@ var _Querier_serviceDesc = grpc.ServiceDesc{ MethodName: "Label", Handler: _Querier_Label_Handler, }, + { + MethodName: "Series", + Handler: _Querier_Series_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -2384,7 +2720,7 @@ func (m *TailResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *DroppedStream) Marshal() (dAtA []byte, err error) { +func (m *SeriesRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -2394,24 +2730,26 @@ func (m *DroppedStream) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *DroppedStream) MarshalTo(dAtA []byte) (int, error) { +func (m *SeriesRequest) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *DroppedStream) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Labels) > 0 { - i -= len(m.Labels) - copy(dAtA[i:], m.Labels) - i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels))) - i-- - dAtA[i] = 0x1a + if len(m.Groups) > 0 { + for iNdEx := len(m.Groups) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Groups[iNdEx]) + copy(dAtA[i:], m.Groups[iNdEx]) + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Groups[iNdEx]))) + i-- + dAtA[i] = 0x1a + } } - n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.To):]) + n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.End, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.End):]) if err8 != nil { return 0, err8 } @@ -2419,7 +2757,7 @@ func (m *DroppedStream) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintLogproto(dAtA, i, uint64(n8)) i-- dAtA[i] = 0x12 - n9, err9 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.From, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.From):]) + n9, err9 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Start, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Start):]) if err9 != nil { return 0, err9 } @@ -2430,7 +2768,7 @@ func (m *DroppedStream) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *TimeSeriesChunk) Marshal() (dAtA []byte, err error) { +func (m *SeriesResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -2440,20 +2778,20 @@ func (m *TimeSeriesChunk) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *TimeSeriesChunk) MarshalTo(dAtA []byte) (int, error) { +func (m *SeriesResponse) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *TimeSeriesChunk) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *SeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int _ = l - if len(m.Chunks) > 0 { - for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { + if len(m.Series) > 0 { + for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.Series[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -2461,15 +2799,140 @@ func (m *TimeSeriesChunk) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintLogproto(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x22 + dAtA[i] = 0xa } } - if len(m.Labels) > 0 { - for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err + return len(dAtA) - i, nil +} + +func (m *SeriesIdentifier) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SeriesIdentifier) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesIdentifier) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + for k := range m.Labels { + v := m.Labels[k] + baseI := i + i -= len(v) + copy(dAtA[i:], v) + i = encodeVarintLogproto(dAtA, i, uint64(len(v))) + i-- + dAtA[i] = 0x12 + i -= len(k) + copy(dAtA[i:], k) + i = encodeVarintLogproto(dAtA, i, uint64(len(k))) + i-- + dAtA[i] = 0xa + i = encodeVarintLogproto(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *DroppedStream) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DroppedStream) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DroppedStream) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Labels) > 0 { + i -= len(m.Labels) + copy(dAtA[i:], m.Labels) + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels))) + i-- + dAtA[i] = 0x1a + } + n10, err10 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.To):]) + if err10 != nil { + return 0, err10 + } + i -= n10 + i = encodeVarintLogproto(dAtA, i, uint64(n10)) + i-- + dAtA[i] = 0x12 + n11, err11 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.From, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.From):]) + if err11 != nil { + return 0, err11 + } + i -= n11 + i = encodeVarintLogproto(dAtA, i, uint64(n11)) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} + +func (m *TimeSeriesChunk) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TimeSeriesChunk) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TimeSeriesChunk) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Chunks) > 0 { + for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintLogproto(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err } i -= size i = encodeVarintLogproto(dAtA, i, uint64(size)) @@ -2771,6 +3234,57 @@ func (m *TailResponse) Size() (n int) { return n } +func (m *SeriesRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Start) + n += 1 + l + sovLogproto(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.End) + n += 1 + l + sovLogproto(uint64(l)) + if len(m.Groups) > 0 { + for _, s := range m.Groups { + l = len(s) + n += 1 + l + sovLogproto(uint64(l)) + } + } + return n +} + +func (m *SeriesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Series) > 0 { + for _, e := range m.Series { + l = e.Size() + n += 1 + l + sovLogproto(uint64(l)) + } + } + return n +} + +func (m *SeriesIdentifier) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for k, v := range m.Labels { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovLogproto(uint64(len(k))) + 1 + len(v) + sovLogproto(uint64(len(v))) + n += mapEntrySize + 1 + sovLogproto(uint64(mapEntrySize)) + } + } + return n +} + func (m *DroppedStream) Size() (n int) { if m == nil { return 0 @@ -2994,6 +3508,53 @@ func (this *TailResponse) String() string { }, "") return s } +func (this *SeriesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&SeriesRequest{`, + `Start:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Start), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `End:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.End), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Groups:` + fmt.Sprintf("%v", this.Groups) + `,`, + `}`, + }, "") + return s +} +func (this *SeriesResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForSeries := "[]SeriesIdentifier{" + for _, f := range this.Series { + repeatedStringForSeries += strings.Replace(strings.Replace(f.String(), "SeriesIdentifier", "SeriesIdentifier", 1), `&`, ``, 1) + "," + } + repeatedStringForSeries += "}" + s := strings.Join([]string{`&SeriesResponse{`, + `Series:` + repeatedStringForSeries + `,`, + `}`, + }, "") + return s +} +func (this *SeriesIdentifier) String() string { + if this == nil { + return "nil" + } + keysForLabels := make([]string, 0, len(this.Labels)) + for k, _ := range this.Labels { + keysForLabels = append(keysForLabels, k) + } + github_com_gogo_protobuf_sortkeys.Strings(keysForLabels) + mapStringForLabels := "map[string]string{" + for _, k := range keysForLabels { + mapStringForLabels += fmt.Sprintf("%v: %v,", k, this.Labels[k]) + } + mapStringForLabels += "}" + s := strings.Join([]string{`&SeriesIdentifier{`, + `Labels:` + mapStringForLabels + `,`, + `}`, + }, "") + return s +} func (this *DroppedStream) String() string { if this == nil { return "nil" @@ -4261,6 +4822,424 @@ func (m *TailResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *SeriesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeriesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeriesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Start", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Start, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field End", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.End, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Groups", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Groups = append(m.Groups, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SeriesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeriesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeriesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Series = append(m.Series, SeriesIdentifier{}) + if err := m.Series[len(m.Series)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SeriesIdentifier) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeriesIdentifier: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeriesIdentifier: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Labels == nil { + m.Labels = make(map[string]string) + } + var mapkey string + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthLogproto + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthLogproto + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthLogproto + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthLogproto + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Labels[mapkey] = mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *DroppedStream) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index 972b856e09444..bd6e934223e9d 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -13,6 +13,7 @@ service Querier { rpc Query(QueryRequest) returns (stream QueryResponse) {}; rpc Label(LabelRequest) returns (LabelResponse) {}; rpc Tail(TailRequest) returns (stream TailResponse) {}; + rpc Series(SeriesRequest) returns (SeriesResponse) {}; } service Ingester { @@ -78,6 +79,20 @@ message TailResponse { repeated DroppedStream droppedStreams = 2; } +message SeriesRequest { + google.protobuf.Timestamp start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp end = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + repeated string groups = 3; +} + +message SeriesResponse { + repeated SeriesIdentifier series = 1 [(gogoproto.nullable) = false]; +} + +message SeriesIdentifier { + map labels = 1; +} + message DroppedStream { google.protobuf.Timestamp from = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; @@ -102,4 +117,4 @@ message Chunk { message TransferChunksResponse { -} +} \ No newline at end of file diff --git a/pkg/logql/marshal/marshal.go b/pkg/logql/marshal/marshal.go index aafc7a7392284..088d3daca7e16 100644 --- a/pkg/logql/marshal/marshal.go +++ b/pkg/logql/marshal/marshal.go @@ -56,3 +56,25 @@ func WriteTailResponseJSON(r legacy.TailResponse, c *websocket.Conn) error { return c.WriteJSON(v1Response) } + +// WriteSeriesResponseJSON marshals a logproto.SeriesResponse to v1 loghttp JSON and then +// writes it to the provided io.Writer. +func WriteSeriesResponseJSON(r logproto.SeriesResponse, w io.Writer) error { + adapter := &seriesResponseAdapter{ + Status: "success", + Data: make([]map[string]string, 0, len(r.GetSeries())), + } + + for _, series := range r.GetSeries() { + adapter.Data = append(adapter.Data, series.GetLabels()) + } + + return json.NewEncoder(w).Encode(adapter) +} + +// This struct exists primarily because we can't specify a repeated map in proto v3. +// Otherwise, we'd use that + gogoproto.jsontag to avoid this layer of indirection +type seriesResponseAdapter struct { + Status string `json:"status"` + Data []map[string]string `json:"data"` +} diff --git a/pkg/logql/marshal/marshal_test.go b/pkg/logql/marshal/marshal_test.go index e81e569ac98d0..63d187f581f62 100644 --- a/pkg/logql/marshal/marshal_test.go +++ b/pkg/logql/marshal/marshal_test.go @@ -2,6 +2,7 @@ package marshal import ( "bytes" + "fmt" "testing" "time" @@ -367,6 +368,42 @@ func Test_TailResponseMarshalLoop(t *testing.T) { } } +func Test_WriteSeriesResponseJSON(t *testing.T) { + + for i, tc := range []struct { + input logproto.SeriesResponse + expected string + }{ + { + logproto.SeriesResponse{ + Series: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{ + "a": "1", + "b": "2", + }, + }, + { + Labels: map[string]string{ + "c": "3", + "d": "4", + }, + }, + }, + }, + `{"status":"success","data":[{"a":"1","b":"2"},{"c":"3","d":"4"}]}`, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + var b bytes.Buffer + err := WriteSeriesResponseJSON(tc.input, &b) + require.NoError(t, err) + + testJSONBytesEqual(t, []byte(tc.expected), b.Bytes(), "Label Test %d failed", i) + }) + } +} + func testJSONBytesEqual(t *testing.T, expected []byte, actual []byte, msg string, args ...interface{}) { var expectedValue map[string]interface{} err := json.Unmarshal(expected, &expectedValue) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f34854bebf241..a5135d153ae75 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -173,11 +173,13 @@ func (t *Loki) initQuerier() (err error) { t.server.HTTP.Handle("/loki/api/v1/labels", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) t.server.HTTP.Handle("/loki/api/v1/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) t.server.HTTP.Handle("/loki/api/v1/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler))) + t.server.HTTP.Handle("/loki/api/v1/series", httpMiddleware.Wrap(http.HandlerFunc(t.querier.SeriesHandler))) t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LogQueryHandler))) t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler))) t.server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler))) + t.server.HTTP.Handle("/api/prom/series", httpMiddleware.Wrap(http.HandlerFunc(t.querier.SeriesHandler))) return } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 0a83f31d85b9a..77925bed41c8e 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -218,6 +218,28 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { } } +// SeriesHandler returns the list of time series that match a certain label set. +// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers +func (q *Querier) SeriesHandler(w http.ResponseWriter, r *http.Request) { + req, err := loghttp.ParseSeriesQuery(r) + if err != nil { + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) + return + } + + resp, err := q.Series(r.Context(), req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + err = marshal.WriteSeriesResponseJSON(*resp, w) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + // NewPrepopulateMiddleware creates a middleware which will parse incoming http forms. // This is important because some endpoints can POST x-www-form-urlencoded bodies instead of GET w/ query strings. func NewPrepopulateMiddleware() middleware.Interface { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2e8030a74cf9b..8fc7f63afe10b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -19,8 +19,10 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/validation" ) @@ -377,6 +379,130 @@ func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.T return reconnectClientsMap, nil } +// Series fetches any matching series for a list of matcher sets +func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + if err = q.validateQueryTimeRange(userID, &req.Start, &req.End); err != nil { + return nil, err + } + + // Enforce the query timeout while querying backends + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout)) + defer cancel() + + return q.awaitSeries(ctx, req) + +} + +func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + + // buffer the channels to the # of calls they're expecting su + series := make(chan [][]logproto.SeriesIdentifier, 2) + errs := make(chan error, 2) + + // fetch series from ingesters and store concurrently + + go func() { + // fetch series identifiers from ingesters + resps, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) { + return client.Series(ctx, req) + }) + if err != nil { + errs <- err + return + } + var acc [][]logproto.SeriesIdentifier + for _, resp := range resps { + acc = append(acc, resp.response.(*logproto.SeriesResponse).Series) + } + series <- acc + }() + + go func() { + storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups()) + if err != nil { + errs <- err + return + } + series <- [][]logproto.SeriesIdentifier{storeValues} + }() + + var sets [][]logproto.SeriesIdentifier + for i := 0; i < 2; i++ { + select { + case err := <-errs: + return nil, err + case s := <-series: + sets = append(sets, s...) + } + } + + deduped := make(map[string]logproto.SeriesIdentifier) + for _, set := range sets { + for _, s := range set { + key := loghttp.LabelSet(s.Labels).String() + if _, exists := deduped[key]; !exists { + deduped[key] = s + } + } + } + + response := &logproto.SeriesResponse{ + Series: make([]logproto.SeriesIdentifier, 0, len(deduped)), + } + + for _, s := range deduped { + response.Series = append(response.Series, s) + } + + return response, nil +} + +// seriesForMatchers fetches series from the store for each matcher set +// TODO: make efficient if/when the index supports labels so we don't have to read chunks +func (q *Querier) seriesForMatchers( + ctx context.Context, + from, through time.Time, + groups []string, +) ([]logproto.SeriesIdentifier, error) { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var results []logproto.SeriesIdentifier + for _, group := range groups { + iter, err := q.store.LazyQuery(ctx, logql.SelectParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: group, + Limit: 1, + Start: from, + End: through, + Direction: logproto.FORWARD, + }, + }) + if err != nil { + return nil, err + } + + for iter.Next() { + ls, err := marshal.NewLabelSet(iter.Labels()) + if err != nil { + return nil, err + } + + results = append(results, logproto.SeriesIdentifier{ + Labels: ls.Map(), + }) + } + } + return results, nil + +} + func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error { userID, err := user.ExtractOrgID(ctx) if err != nil { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 8bd74c8d927c8..8516d627ead83 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -51,6 +51,15 @@ func (c *querierClientMock) Tail(ctx context.Context, in *logproto.TailRequest, return args.Get(0).(logproto.Querier_TailClient), args.Error(1) } +func (c *querierClientMock) Series(ctx context.Context, in *logproto.SeriesRequest, opts ...grpc.CallOption) (*logproto.SeriesResponse, error) { + args := c.Called(ctx, in) + res := args.Get(0) + if res == nil { + return (*logproto.SeriesResponse)(nil), args.Error(1) + } + return res.(*logproto.SeriesResponse), args.Error(1) +} + // newIngesterClientMockFactory creates a factory function always returning // the input querierClientMock func newIngesterClientMockFactory(c *querierClientMock) cortex_client.Factory { @@ -173,7 +182,11 @@ func newStoreMock() *storeMock { func (s *storeMock) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { args := s.Called(ctx, req) - return args.Get(0).(iter.EntryIterator), args.Error(1) + res := args.Get(0) + if res == nil { + return iter.EntryIterator(nil), args.Error(1) + } + return res.(iter.EntryIterator), args.Error(1) } func (s *storeMock) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) { @@ -271,9 +284,22 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator { return iter.NewStreamIterator(mockStream(from, quantity)) } +func mockStreamIterFromLabelSets(from, quantity int, sets []string) iter.EntryIterator { + var streams []*logproto.Stream + for _, s := range sets { + streams = append(streams, mockStreamWithLabels(from, quantity, s)) + } + + return iter.NewStreamsIterator(streams, logproto.FORWARD) +} + // mockStream return a stream with quantity entries, where entries timestamp and // line string are constructed as sequential numbers starting at from func mockStream(from int, quantity int) *logproto.Stream { + return mockStreamWithLabels(from, quantity, `{type="test"}`) +} + +func mockStreamWithLabels(from int, quantity int, labels string) *logproto.Stream { entries := make([]logproto.Entry, 0, quantity) for i := from; i < from+quantity; i++ { @@ -285,6 +311,6 @@ func mockStream(from int, quantity int) *logproto.Stream { return &logproto.Stream{ Entries: entries, - Labels: `{type="test"}`, + Labels: labels, } } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 2a197828b95b1..3a48ea4450d3e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -2,6 +2,7 @@ package querier import ( "context" + "errors" "net/http" "testing" "time" @@ -294,3 +295,151 @@ func TestQuerier_validateQueryRequest(t *testing.T) { _, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request}) require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, length > limit (3m0s > 2m0s)"), err) } + +func TestQuerier_SeriesAPI(t *testing.T) { + mkReq := func(groups []string) *logproto.SeriesRequest { + return &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(10, 0), + Groups: groups, + } + } + + mockSeriesResponse := func(series []map[string]string) *logproto.SeriesResponse { + resp := &logproto.SeriesResponse{} + for _, s := range series { + resp.Series = append(resp.Series, logproto.SeriesIdentifier{ + Labels: s, + }) + } + return resp + } + + for _, tc := range []struct { + desc string + req *logproto.SeriesRequest + setup func(*storeMock, *queryClientMock, *querierClientMock, validation.Limits, *logproto.SeriesRequest) + run func(*testing.T, *Querier, *logproto.SeriesRequest) + }{ + { + "ingester error", + mkReq([]string{`{a="1"}`}), + func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { + ingester.On("Series", mock.Anything, req, mock.Anything).Return(nil, errors.New("tst-err")) + + store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 0), nil) + }, + func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { + ctx := user.InjectOrgID(context.Background(), "test") + _, err := q.Series(ctx, req) + require.Error(t, err) + }, + }, + { + "store error", + mkReq([]string{`{a="1"}`}), + func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { + ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse([]map[string]string{ + {"a": "1"}, + }), nil) + + store.On("LazyQuery", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded) + }, + func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { + ctx := user.InjectOrgID(context.Background(), "test") + _, err := q.Series(ctx, req) + require.Error(t, err) + }, + }, + { + "no matches", + mkReq([]string{`{a="1"}`}), + func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { + ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse(nil), nil) + + store.On("LazyQuery", mock.Anything, mock.Anything). + Return(mockStreamIterator(0, 0), nil) + }, + func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := q.Series(ctx, req) + require.Nil(t, err) + require.Equal(t, &logproto.SeriesResponse{Series: make([]logproto.SeriesIdentifier, 0)}, resp) + }, + }, + { + "returns series", + mkReq([]string{`{a="1"}`}), + func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { + ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse([]map[string]string{ + {"a": "1", "b": "2"}, + {"a": "1", "b": "3"}, + }), nil) + + store.On("LazyQuery", mock.Anything, mock.Anything). + Return(mockStreamIterFromLabelSets(0, 10, []string{ + `{a="1",b="4"}`, + `{a="1",b="5"}`, + }), nil) + }, + func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := q.Series(ctx, req) + require.Nil(t, err) + require.ElementsMatch(t, []logproto.SeriesIdentifier{ + {Labels: map[string]string{"a": "1", "b": "2"}}, + {Labels: map[string]string{"a": "1", "b": "3"}}, + {Labels: map[string]string{"a": "1", "b": "4"}}, + {Labels: map[string]string{"a": "1", "b": "5"}}, + }, resp.GetSeries()) + }, + }, + { + "dedupes", + mkReq([]string{`{a="1"}`}), + func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) { + ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse([]map[string]string{ + {"a": "1", "b": "2"}, + }), nil) + + store.On("LazyQuery", mock.Anything, mock.Anything). + Return(mockStreamIterFromLabelSets(0, 10, []string{ + `{a="1",b="2"}`, + `{a="1",b="3"}`, + }), nil) + }, + func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := q.Series(ctx, req) + require.Nil(t, err) + require.ElementsMatch(t, []logproto.SeriesIdentifier{ + {Labels: map[string]string{"a": "1", "b": "2"}}, + {Labels: map[string]string{"a": "1", "b": "3"}}, + }, resp.GetSeries()) + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + store := newStoreMock() + queryClient := newQueryClientMock() + ingesterClient := newQuerierClientMock() + defaultLimits := defaultLimitsTestConfig() + if tc.setup != nil { + tc.setup(store, queryClient, ingesterClient, defaultLimits, tc.req) + } + + limits, err := validation.NewOverrides(defaultLimits) + require.NoError(t, err) + + q, err := newQuerier( + mockQuerierConfig(), + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + store, limits) + require.NoError(t, err) + + tc.run(t, q, tc.req) + }) + } +}