From 6ebaa1f4f7c27b44752feebe1f131b742ad46a39 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 14 Dec 2020 13:17:41 -0500 Subject: [PATCH 1/6] support federated metadata API Signed-off-by: Ben Ye --- cmd/thanos/query.go | 39 + cmd/thanos/sidecar.go | 2 + pkg/api/query/v1.go | 50 +- pkg/metadata/metadata.go | 108 ++ pkg/metadata/metadatapb/custom.go | 28 + pkg/metadata/metadatapb/rpc.pb.go | 1523 +++++++++++++++++++++++++++++ pkg/metadata/metadatapb/rpc.proto | 56 ++ pkg/metadata/prometheus.go | 36 + pkg/metadata/prometheus_test.go | 96 ++ pkg/metadata/proxy.go | 138 +++ pkg/promclient/promclient.go | 22 + pkg/query/storeset.go | 58 +- pkg/query/storeset_test.go | 10 + scripts/genproto.sh | 2 +- test/e2e/compact_test.go | 2 +- test/e2e/e2ethanos/services.go | 6 +- test/e2e/metadata_api_test.go | 95 ++ test/e2e/query_frontend_test.go | 4 +- test/e2e/query_test.go | 17 +- test/e2e/receive_test.go | 10 +- test/e2e/rule_test.go | 6 +- test/e2e/rules_api_test.go | 1 + test/e2e/store_gateway_test.go | 2 +- 23 files changed, 2283 insertions(+), 28 deletions(-) create mode 100644 pkg/metadata/metadata.go create mode 100644 pkg/metadata/metadatapb/custom.go create mode 100644 pkg/metadata/metadatapb/rpc.pb.go create mode 100644 pkg/metadata/metadatapb/rpc.proto create mode 100644 pkg/metadata/prometheus.go create mode 100644 pkg/metadata/prometheus_test.go create mode 100644 pkg/metadata/proxy.go create mode 100644 test/e2e/metadata_api_test.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index c562cc2b8d..dbe115a8ea 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -36,6 +36,7 @@ import ( extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/metadata" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" @@ -96,6 +97,9 @@ func registerQuery(app *extkingpin.App) { ruleEndpoints := cmd.Flag("rule", "Experimental: Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups."). Hidden().PlaceHolder("").Strings() + metadataEndpoints := cmd.Flag("metadata", "Experimental: Addresses of statically configured metadata API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect metadata API servers through respective DNS lookups."). + Hidden().PlaceHolder("").Strings() + strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top."). PlaceHolder("").Strings() @@ -123,6 +127,9 @@ func registerQuery(app *extkingpin.App) { enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling."). Hidden().Default("true").Bool() + enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling."). + Hidden().Default("true").Bool() + defaultEvaluationInterval := extkingpin.ModelDuration(cmd.Flag("query.default-evaluation-interval", "Set default evaluation interval for sub queries.").Default("1m")) defaultRangeQueryStep := extkingpin.ModelDuration(cmd.Flag("query.default-step", "Set default step for range queries. Default step is only used when step is not set in UI. In such cases, Thanos UI will use default step to calculate resolution (resolution = max(rangeSeconds / 250, defaultStep)). This will not work from Grafana, but Grafana has __step variable which can be used."). @@ -144,6 +151,10 @@ func registerQuery(app *extkingpin.App) { return errors.Errorf("Address %s is duplicated for --rule flag.", dup) } + if dup := firstDuplicate(*metadataEndpoints); dup != "" { + return errors.Errorf("Address %s is duplicated for --target flag.", dup) + } + var fileSD *file.Discovery if len(*fileSDFiles) > 0 { conf := &file.SDConfig{ @@ -195,9 +206,11 @@ func registerQuery(app *extkingpin.App) { getFlagsMap(cmd.Flags()), *stores, *ruleEndpoints, + *metadataEndpoints, *enableAutodownsampling, *enableQueryPartialResponse, *enableRulePartialResponse, + *enableMetricMetadataPartialResponse, fileSD, time.Duration(*dnsSDInterval), *dnsSDResolver, @@ -246,9 +259,11 @@ func runQuery( flagsMap map[string]string, storeAddrs []string, ruleAddrs []string, + metadataAddrs []string, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, + enableMetricMetadataPartialResponse bool, fileSD *file.Discovery, dnsSDInterval time.Duration, dnsSDResolver string, @@ -288,6 +303,12 @@ func runQuery( dns.ResolverType(dnsSDResolver), ) + dnsMetadataProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_query_metadata_apis_", reg), + dns.ResolverType(dnsSDResolver), + ) + var ( stores = query.NewStoreSet( logger, @@ -314,11 +335,22 @@ func runQuery( return specs }, + func() (specs []query.MetadataSpec) { + for _, addr := range dnsMetadataProvider.Addresses() { + specs = append(specs, query.NewGRPCStoreSpec(addr, false)) + } + + // NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis. + // hence, any duplicates will be tracked in the store api set. + + return specs + }, dialOpts, unhealthyStoreTimeout, ) proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) rulesProxy = rules.NewProxy(logger, stores.GetRulesClients) + metadataProxy = metadata.NewProxy(logger, stores.GetMetadataClients) queryableCreator = query.NewQueryableCreator( logger, extprom.WrapRegistererWithPrefix("thanos_query_", reg), @@ -381,6 +413,7 @@ func runQuery( if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err) } + // Rules apis do not support file service discovery as of now. case <-ctxUpdate.Done(): return nil @@ -404,6 +437,9 @@ func runQuery( if err := dnsRuleProvider.Resolve(resolveCtx, ruleAddrs); err != nil { level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) } + if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil { + level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) + } return nil }) }, func(error) { @@ -454,9 +490,11 @@ func runQuery( queryableCreator, // NOTE: Will share the same replica label as the query for now. rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), + metadata.NewGRPCClient(metadataProxy), enableAutodownsampling, enableQueryPartialResponse, enableRulePartialResponse, + enableMetricMetadataPartialResponse, queryReplicaLabels, flagsMap, defaultRangeQueryStep, @@ -497,6 +535,7 @@ func runQuery( s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(proxy)), grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), + grpcserver.WithServer(metadata.RegisterMetadataServer(metadataProxy)), grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 6d38f92c6f..13abbffa6c 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/extprom" thanoshttp "github.com/thanos-io/thanos/pkg/http" + meta "github.com/thanos-io/thanos/pkg/metadata" thanosmodel "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -218,6 +219,7 @@ func runSidecar( s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(promStore)), grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), + grpcserver.WithServer(meta.RegisterMetadataServer(meta.NewPrometheus(conf.prometheus.url, c))), grpcserver.WithListen(conf.grpc.bindAddress), grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), grpcserver.WithTLSConfig(tlsCfg), diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 3d04c35508..7c2cc40c9c 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -43,6 +43,8 @@ import ( extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/metadata" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" "github.com/thanos-io/thanos/pkg/rules/rulespb" @@ -70,10 +72,12 @@ type QueryAPI struct { // queryEngine returns appropriate promql.Engine for a query with a given step. queryEngine func(int64) *promql.Engine ruleGroups rules.UnaryClient + metadatas metadata.UnaryClient - enableAutodownsampling bool - enableQueryPartialResponse bool - enableRulePartialResponse bool + enableAutodownsampling bool + enableQueryPartialResponse bool + enableRulePartialResponse bool + enableMetricMetadataPartialResponse bool replicaLabels []string storeSet *query.StoreSet @@ -90,9 +94,11 @@ func NewQueryAPI( qe func(int64) *promql.Engine, c query.QueryableCreator, ruleGroups rules.UnaryClient, + metadatas metadata.UnaryClient, enableAutodownsampling bool, enableQueryPartialResponse bool, enableRulePartialResponse bool, + enableMetricMetadataPartialResponse bool, replicaLabels []string, flagsMap map[string]string, defaultRangeQueryStep time.Duration, @@ -107,10 +113,12 @@ func NewQueryAPI( queryableCreate: c, gate: gate, ruleGroups: ruleGroups, + metadatas: metadatas, enableAutodownsampling: enableAutodownsampling, enableQueryPartialResponse: enableQueryPartialResponse, enableRulePartialResponse: enableRulePartialResponse, + enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, replicaLabels: replicaLabels, storeSet: storeSet, defaultRangeQueryStep: defaultRangeQueryStep, @@ -142,6 +150,8 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge r.Get("/stores", instr("stores", qapi.stores)) r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse))) + + r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse))) } type queryData struct { @@ -630,7 +640,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap return names, warnings, nil } -func (qapi *QueryAPI) stores(r *http.Request) (interface{}, []error, *api.ApiError) { +func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError) { statuses := make(map[string][]query.StoreStatus) for _, status := range qapi.storeSet.GetStoreStatus() { statuses[status.StoreType.String()] = append(statuses[status.StoreType.String()], status) @@ -790,3 +800,35 @@ func labelValuesByMatchers(sets []storage.SeriesSet, name string) ([]string, sto sort.Strings(labelValues) return labelValues, warnings, nil } + +func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) { + ps := storepb.PartialResponseStrategy_ABORT + if enablePartialResponse { + ps = storepb.PartialResponseStrategy_WARN + } + + return func(r *http.Request) (interface{}, []error, *api.ApiError) { + req := &metadatapb.MetadataRequest{ + // By default we use -1, which means no limit. + Limit: -1, + Metric: r.URL.Query().Get("metric"), + PartialResponseStrategy: ps, + } + + limitStr := r.URL.Query().Get("limit") + if limitStr != "" { + limit, err := strconv.Atoi(limitStr) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid metric metadata limit='%v'", limit)} + } + req.Limit = int32(limit) + } + + t, warnings, err := client.Metadata(r.Context(), req) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")} + } + + return t, warnings, nil + } +} diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go new file mode 100644 index 0000000000..9396b88192 --- /dev/null +++ b/pkg/metadata/metadata.go @@ -0,0 +1,108 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" +) + +var _ UnaryClient = &GRPCClient{} + +// UnaryClient is gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not +// support streaming. +type UnaryClient interface { + Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) +} + +// GRPCClient allows to retrieve metadata from local gRPC streaming server implementation. +// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available. +type GRPCClient struct { + proxy metadatapb.MetadataServer +} + +func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient { + return &GRPCClient{ + proxy: ts, + } +} + +func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) { + srv := &metadataServer{ctx: ctx, metric: req.Metric, limit: int(req.Limit)} + + if req.Limit >= 0 { + if req.Metric != "" { + srv.metadataMap = make(map[string][]metadatapb.Meta, 1) + } else if req.Limit <= 100 { + srv.metadataMap = make(map[string][]metadatapb.Meta, req.Limit) + } else { + srv.metadataMap = make(map[string][]metadatapb.Meta) + } + } else { + srv.metadataMap = make(map[string][]metadatapb.Meta) + } + + if err := rr.proxy.Metadata(req, srv); err != nil { + return nil, nil, errors.Wrap(err, "proxy Metadata") + } + + return srv.metadataMap, srv.warnings, nil +} + +type metadataServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + metadatapb.Metadata_MetadataServer + ctx context.Context + + metric string + limit int + + warnings []error + metadataMap map[string][]metadatapb.Meta +} + +func (srv *metadataServer) Send(res *metadatapb.MetadataResponse) error { + if res.GetWarning() != "" { + srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + return nil + } + + if res.GetMetadata() == nil { + return errors.New("no metadata") + } + + // If limit is set to 0, we don't need to add anything. + if srv.limit == 0 { + return nil + } + + for k, v := range res.GetMetadata().Metadata { + if metadata, ok := srv.metadataMap[k]; !ok { + // If limit is set and it is positive, we limit the size of the map. + if srv.limit < 0 || srv.limit > 0 && len(srv.metadataMap) < srv.limit { + srv.metadataMap[k] = v.Metas + } + } else { + // There shouldn't be many metadata for one single metric. + Outer: + for _, meta := range v.Metas { + for _, m := range metadata { + if meta == m { + continue Outer + } + } + srv.metadataMap[k] = append(srv.metadataMap[k], meta) + } + } + } + + return nil +} + +func (srv *metadataServer) Context() context.Context { + return srv.ctx +} diff --git a/pkg/metadata/metadatapb/custom.go b/pkg/metadata/metadatapb/custom.go new file mode 100644 index 0000000000..36f53a0889 --- /dev/null +++ b/pkg/metadata/metadatapb/custom.go @@ -0,0 +1,28 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadatapb + +import ( + "unsafe" +) + +func NewMetadataResponse(metadata *MetricMetadata) *MetadataResponse { + return &MetadataResponse{ + Result: &MetadataResponse_Metadata{ + Metadata: metadata, + }, + } +} + +func NewWarningMetadataResponse(warning error) *MetadataResponse { + return &MetadataResponse{ + Result: &MetadataResponse_Warning{ + Warning: warning.Error(), + }, + } +} + +func FromMetadataMap(m map[string][]Meta) *MetricMetadata { + return &MetricMetadata{Metadata: *(*map[string]MetricMetadataEntry)(unsafe.Pointer(&m))} +} diff --git a/pkg/metadata/metadatapb/rpc.pb.go b/pkg/metadata/metadatapb/rpc.pb.go new file mode 100644 index 0000000000..33f3b09a96 --- /dev/null +++ b/pkg/metadata/metadatapb/rpc.pb.go @@ -0,0 +1,1523 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: metadata/metadatapb/rpc.proto + +package metadatapb + +import ( + context "context" + fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + storepb "github.com/thanos-io/thanos/pkg/store/storepb" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type MetadataRequest struct { + Metric string `protobuf:"bytes,1,opt,name=metric,proto3" json:"metric,omitempty"` + Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` + PartialResponseStrategy storepb.PartialResponseStrategy `protobuf:"varint,3,opt,name=partial_response_strategy,json=partialResponseStrategy,proto3,enum=thanos.PartialResponseStrategy" json:"partial_response_strategy,omitempty"` +} + +func (m *MetadataRequest) Reset() { *m = MetadataRequest{} } +func (m *MetadataRequest) String() string { return proto.CompactTextString(m) } +func (*MetadataRequest) ProtoMessage() {} +func (*MetadataRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_1d9ae5661e0dc3fc, []int{0} +} +func (m *MetadataRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetadataRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetadataRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetadataRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetadataRequest.Merge(m, src) +} +func (m *MetadataRequest) XXX_Size() int { + return m.Size() +} +func (m *MetadataRequest) XXX_DiscardUnknown() { + xxx_messageInfo_MetadataRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_MetadataRequest proto.InternalMessageInfo + +type MetadataResponse struct { + // Types that are valid to be assigned to Result: + // *MetadataResponse_Metadata + // *MetadataResponse_Warning + Result isMetadataResponse_Result `protobuf_oneof:"result"` +} + +func (m *MetadataResponse) Reset() { *m = MetadataResponse{} } +func (m *MetadataResponse) String() string { return proto.CompactTextString(m) } +func (*MetadataResponse) ProtoMessage() {} +func (*MetadataResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_1d9ae5661e0dc3fc, []int{1} +} +func (m *MetadataResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetadataResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetadataResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetadataResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetadataResponse.Merge(m, src) +} +func (m *MetadataResponse) XXX_Size() int { + return m.Size() +} +func (m *MetadataResponse) XXX_DiscardUnknown() { + xxx_messageInfo_MetadataResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_MetadataResponse proto.InternalMessageInfo + +type isMetadataResponse_Result interface { + isMetadataResponse_Result() + MarshalTo([]byte) (int, error) + Size() int +} + +type MetadataResponse_Metadata struct { + Metadata *MetricMetadata `protobuf:"bytes,1,opt,name=metadata,proto3,oneof" json:"metadata,omitempty"` +} +type MetadataResponse_Warning struct { + Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof" json:"warning,omitempty"` +} + +func (*MetadataResponse_Metadata) isMetadataResponse_Result() {} +func (*MetadataResponse_Warning) isMetadataResponse_Result() {} + +func (m *MetadataResponse) GetResult() isMetadataResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (m *MetadataResponse) GetMetadata() *MetricMetadata { + if x, ok := m.GetResult().(*MetadataResponse_Metadata); ok { + return x.Metadata + } + return nil +} + +func (m *MetadataResponse) GetWarning() string { + if x, ok := m.GetResult().(*MetadataResponse_Warning); ok { + return x.Warning + } + return "" +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*MetadataResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*MetadataResponse_Metadata)(nil), + (*MetadataResponse_Warning)(nil), + } +} + +type MetricMetadata struct { + Metadata map[string]MetricMetadataEntry `protobuf:"bytes,1,rep,name=metadata,proto3" json:"metadata" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (m *MetricMetadata) Reset() { *m = MetricMetadata{} } +func (m *MetricMetadata) String() string { return proto.CompactTextString(m) } +func (*MetricMetadata) ProtoMessage() {} +func (*MetricMetadata) Descriptor() ([]byte, []int) { + return fileDescriptor_1d9ae5661e0dc3fc, []int{2} +} +func (m *MetricMetadata) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetricMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetricMetadata.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetricMetadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetricMetadata.Merge(m, src) +} +func (m *MetricMetadata) XXX_Size() int { + return m.Size() +} +func (m *MetricMetadata) XXX_DiscardUnknown() { + xxx_messageInfo_MetricMetadata.DiscardUnknown(m) +} + +var xxx_messageInfo_MetricMetadata proto.InternalMessageInfo + +type MetricMetadataEntry struct { + Metas []Meta `protobuf:"bytes,2,rep,name=metas,proto3" json:"metas"` +} + +func (m *MetricMetadataEntry) Reset() { *m = MetricMetadataEntry{} } +func (m *MetricMetadataEntry) String() string { return proto.CompactTextString(m) } +func (*MetricMetadataEntry) ProtoMessage() {} +func (*MetricMetadataEntry) Descriptor() ([]byte, []int) { + return fileDescriptor_1d9ae5661e0dc3fc, []int{3} +} +func (m *MetricMetadataEntry) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetricMetadataEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetricMetadataEntry.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetricMetadataEntry) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetricMetadataEntry.Merge(m, src) +} +func (m *MetricMetadataEntry) XXX_Size() int { + return m.Size() +} +func (m *MetricMetadataEntry) XXX_DiscardUnknown() { + xxx_messageInfo_MetricMetadataEntry.DiscardUnknown(m) +} + +var xxx_messageInfo_MetricMetadataEntry proto.InternalMessageInfo + +type Meta struct { + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type"` + Help string `protobuf:"bytes,2,opt,name=help,proto3" json:"help"` + Unit string `protobuf:"bytes,3,opt,name=unit,proto3" json:"unit"` +} + +func (m *Meta) Reset() { *m = Meta{} } +func (m *Meta) String() string { return proto.CompactTextString(m) } +func (*Meta) ProtoMessage() {} +func (*Meta) Descriptor() ([]byte, []int) { + return fileDescriptor_1d9ae5661e0dc3fc, []int{4} +} +func (m *Meta) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Meta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Meta.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Meta) XXX_Merge(src proto.Message) { + xxx_messageInfo_Meta.Merge(m, src) +} +func (m *Meta) XXX_Size() int { + return m.Size() +} +func (m *Meta) XXX_DiscardUnknown() { + xxx_messageInfo_Meta.DiscardUnknown(m) +} + +var xxx_messageInfo_Meta proto.InternalMessageInfo + +func init() { + proto.RegisterType((*MetadataRequest)(nil), "thanos.MetadataRequest") + proto.RegisterType((*MetadataResponse)(nil), "thanos.MetadataResponse") + proto.RegisterType((*MetricMetadata)(nil), "thanos.MetricMetadata") + proto.RegisterMapType((map[string]MetricMetadataEntry)(nil), "thanos.MetricMetadata.MetadataEntry") + proto.RegisterType((*MetricMetadataEntry)(nil), "thanos.MetricMetadataEntry") + proto.RegisterType((*Meta)(nil), "thanos.Meta") +} + +func init() { proto.RegisterFile("metadata/metadatapb/rpc.proto", fileDescriptor_1d9ae5661e0dc3fc) } + +var fileDescriptor_1d9ae5661e0dc3fc = []byte{ + // 463 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xf6, 0xe6, 0x8f, 0x74, 0x02, 0xa5, 0x5a, 0xaa, 0xd6, 0x35, 0xe0, 0x44, 0x16, 0x07, 0x9f, + 0x62, 0x30, 0x1c, 0x10, 0x97, 0x4a, 0x91, 0x40, 0x95, 0x50, 0x25, 0xb4, 0x5c, 0x10, 0x1c, 0xca, + 0xa6, 0xac, 0x52, 0x0b, 0xc7, 0x5e, 0x76, 0xd7, 0x20, 0xbf, 0x05, 0x0f, 0xc0, 0x53, 0xf0, 0x14, + 0x39, 0xf6, 0xc8, 0xa9, 0x82, 0xe4, 0xc6, 0x53, 0xa0, 0xdd, 0xb5, 0x5b, 0x47, 0xf8, 0x32, 0xfa, + 0x66, 0xbe, 0xcf, 0x33, 0x9f, 0x66, 0xc7, 0xf0, 0x70, 0xc9, 0x14, 0xfd, 0x44, 0x15, 0x8d, 0x6a, + 0xc0, 0xe7, 0x91, 0xe0, 0xe7, 0x53, 0x2e, 0x72, 0x95, 0xe3, 0x81, 0xba, 0xa0, 0x59, 0x2e, 0xbd, + 0x23, 0xa9, 0x72, 0xc1, 0x22, 0x13, 0xf9, 0x3c, 0x52, 0x25, 0x67, 0xd2, 0x4a, 0xbc, 0xfd, 0x45, + 0xbe, 0xc8, 0x0d, 0x8c, 0x34, 0xb2, 0xd5, 0xe0, 0x07, 0x82, 0xbb, 0xa7, 0x55, 0x47, 0xc2, 0xbe, + 0x14, 0x4c, 0x2a, 0x7c, 0x00, 0x83, 0x25, 0x53, 0x22, 0x39, 0x77, 0xd1, 0x04, 0x85, 0x3b, 0xa4, + 0xca, 0xf0, 0x3e, 0xf4, 0xd3, 0x64, 0x99, 0x28, 0xb7, 0x33, 0x41, 0x61, 0x9f, 0xd8, 0x04, 0x7f, + 0x80, 0x23, 0x4e, 0x85, 0x4a, 0x68, 0x7a, 0x26, 0x98, 0xe4, 0x79, 0x26, 0xd9, 0x99, 0x54, 0x82, + 0x2a, 0xb6, 0x28, 0xdd, 0xee, 0x04, 0x85, 0xbb, 0xf1, 0x78, 0x6a, 0xed, 0x4d, 0xdf, 0x58, 0x21, + 0xa9, 0x74, 0x6f, 0x2b, 0x19, 0x39, 0xe4, 0xed, 0x44, 0x90, 0xc1, 0xde, 0x8d, 0x3b, 0xcb, 0xe1, + 0x67, 0x30, 0xac, 0x77, 0x60, 0x0c, 0x8e, 0xe2, 0x83, 0xba, 0xff, 0xa9, 0x31, 0x5a, 0x7f, 0x71, + 0xe2, 0x90, 0x6b, 0x25, 0xf6, 0xe0, 0xd6, 0x37, 0x2a, 0xb2, 0x24, 0x5b, 0x18, 0xfb, 0x3b, 0x27, + 0x0e, 0xa9, 0x0b, 0xb3, 0x21, 0x0c, 0x04, 0x93, 0x45, 0xaa, 0x82, 0x9f, 0x08, 0x76, 0xb7, 0x9b, + 0xe0, 0x57, 0x5b, 0xe3, 0xba, 0xe1, 0x28, 0x7e, 0xd4, 0x3e, 0x6e, 0x5a, 0x83, 0x97, 0x99, 0x12, + 0xe5, 0xac, 0xb7, 0xba, 0x1a, 0x37, 0x0c, 0x78, 0xef, 0xe0, 0xce, 0x96, 0x00, 0xef, 0x41, 0xf7, + 0x33, 0x2b, 0xab, 0x1d, 0x6b, 0x88, 0x9f, 0x40, 0xff, 0x2b, 0x4d, 0x0b, 0x66, 0x1c, 0x8e, 0xe2, + 0xfb, 0xed, 0x73, 0xcc, 0xd7, 0xc4, 0x2a, 0x5f, 0x74, 0x9e, 0xa3, 0xe0, 0x18, 0xee, 0xb5, 0x28, + 0x70, 0x08, 0x7d, 0x3d, 0x5c, 0xba, 0x1d, 0xe3, 0xfa, 0x76, 0xa3, 0x1b, 0xad, 0xdc, 0x59, 0x41, + 0xf0, 0x11, 0x7a, 0xba, 0x88, 0x1f, 0x40, 0x4f, 0x5f, 0x8c, 0xb5, 0x34, 0x1b, 0xfe, 0xbd, 0x1a, + 0x9b, 0x9c, 0x98, 0xa8, 0xd9, 0x0b, 0x96, 0x72, 0xbb, 0x3e, 0xcb, 0xea, 0x9c, 0x98, 0xa8, 0xd9, + 0x22, 0x4b, 0x94, 0x79, 0xf1, 0x8a, 0xd5, 0x39, 0x31, 0x31, 0x7e, 0x0d, 0xc3, 0xeb, 0x85, 0x1e, + 0x37, 0xf0, 0x61, 0xd3, 0x54, 0xe3, 0x06, 0x3d, 0xf7, 0x7f, 0xc2, 0x3e, 0xff, 0x63, 0x34, 0x0b, + 0x57, 0x7f, 0x7c, 0x67, 0xb5, 0xf6, 0xd1, 0xe5, 0xda, 0x47, 0xbf, 0xd7, 0x3e, 0xfa, 0xbe, 0xf1, + 0x9d, 0xcb, 0x8d, 0xef, 0xfc, 0xda, 0xf8, 0xce, 0x7b, 0xb8, 0xf9, 0x41, 0xe6, 0x03, 0x73, 0xe4, + 0x4f, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x08, 0x77, 0xe4, 0x56, 0x3e, 0x03, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// MetadataClient is the client API for Metadata service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type MetadataClient interface { + Metadata(ctx context.Context, in *MetadataRequest, opts ...grpc.CallOption) (Metadata_MetadataClient, error) +} + +type metadataClient struct { + cc *grpc.ClientConn +} + +func NewMetadataClient(cc *grpc.ClientConn) MetadataClient { + return &metadataClient{cc} +} + +func (c *metadataClient) Metadata(ctx context.Context, in *MetadataRequest, opts ...grpc.CallOption) (Metadata_MetadataClient, error) { + stream, err := c.cc.NewStream(ctx, &_Metadata_serviceDesc.Streams[0], "/thanos.Metadata/Metadata", opts...) + if err != nil { + return nil, err + } + x := &metadataMetadataClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Metadata_MetadataClient interface { + Recv() (*MetadataResponse, error) + grpc.ClientStream +} + +type metadataMetadataClient struct { + grpc.ClientStream +} + +func (x *metadataMetadataClient) Recv() (*MetadataResponse, error) { + m := new(MetadataResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MetadataServer is the server API for Metadata service. +type MetadataServer interface { + Metadata(*MetadataRequest, Metadata_MetadataServer) error +} + +// UnimplementedMetadataServer can be embedded to have forward compatible implementations. +type UnimplementedMetadataServer struct { +} + +func (*UnimplementedMetadataServer) Metadata(req *MetadataRequest, srv Metadata_MetadataServer) error { + return status.Errorf(codes.Unimplemented, "method Metadata not implemented") +} + +func RegisterMetadataServer(s *grpc.Server, srv MetadataServer) { + s.RegisterService(&_Metadata_serviceDesc, srv) +} + +func _Metadata_Metadata_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(MetadataRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MetadataServer).Metadata(m, &metadataMetadataServer{stream}) +} + +type Metadata_MetadataServer interface { + Send(*MetadataResponse) error + grpc.ServerStream +} + +type metadataMetadataServer struct { + grpc.ServerStream +} + +func (x *metadataMetadataServer) Send(m *MetadataResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _Metadata_serviceDesc = grpc.ServiceDesc{ + ServiceName: "thanos.Metadata", + HandlerType: (*MetadataServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Metadata", + Handler: _Metadata_Metadata_Handler, + ServerStreams: true, + }, + }, + Metadata: "metadata/metadatapb/rpc.proto", +} + +func (m *MetadataRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetadataRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetadataRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.PartialResponseStrategy != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.PartialResponseStrategy)) + i-- + dAtA[i] = 0x18 + } + if m.Limit != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.Limit)) + i-- + dAtA[i] = 0x10 + } + if len(m.Metric) > 0 { + i -= len(m.Metric) + copy(dAtA[i:], m.Metric) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Metric))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *MetadataResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetadataResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetadataResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Result != nil { + { + size := m.Result.Size() + i -= size + if _, err := m.Result.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *MetadataResponse_Metadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetadataResponse_Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Metadata != nil { + { + size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} +func (m *MetadataResponse_Warning) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetadataResponse_Warning) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Warning) + copy(dAtA[i:], m.Warning) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Warning))) + i-- + dAtA[i] = 0x12 + return len(dAtA) - i, nil +} +func (m *MetricMetadata) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetricMetadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetricMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Metadata) > 0 { + for k := range m.Metadata { + v := m.Metadata[k] + baseI := i + { + size, err := (&v).MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + i -= len(k) + copy(dAtA[i:], k) + i = encodeVarintRpc(dAtA, i, uint64(len(k))) + i-- + dAtA[i] = 0xa + i = encodeVarintRpc(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *MetricMetadataEntry) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetricMetadataEntry) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetricMetadataEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Metas) > 0 { + for iNdEx := len(m.Metas) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Metas[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + return len(dAtA) - i, nil +} + +func (m *Meta) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Meta) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Meta) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Unit) > 0 { + i -= len(m.Unit) + copy(dAtA[i:], m.Unit) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Unit))) + i-- + dAtA[i] = 0x1a + } + if len(m.Help) > 0 { + i -= len(m.Help) + copy(dAtA[i:], m.Help) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Help))) + i-- + dAtA[i] = 0x12 + } + if len(m.Type) > 0 { + i -= len(m.Type) + copy(dAtA[i:], m.Type) + i = encodeVarintRpc(dAtA, i, uint64(len(m.Type))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { + offset -= sovRpc(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *MetadataRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Metric) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + if m.Limit != 0 { + n += 1 + sovRpc(uint64(m.Limit)) + } + if m.PartialResponseStrategy != 0 { + n += 1 + sovRpc(uint64(m.PartialResponseStrategy)) + } + return n +} + +func (m *MetadataResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *MetadataResponse_Metadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Metadata != nil { + l = m.Metadata.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} +func (m *MetadataResponse_Warning) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warning) + n += 1 + l + sovRpc(uint64(l)) + return n +} +func (m *MetricMetadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Metadata) > 0 { + for k, v := range m.Metadata { + _ = k + _ = v + l = v.Size() + mapEntrySize := 1 + len(k) + sovRpc(uint64(len(k))) + 1 + l + sovRpc(uint64(l)) + n += mapEntrySize + 1 + sovRpc(uint64(mapEntrySize)) + } + } + return n +} + +func (m *MetricMetadataEntry) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Metas) > 0 { + for _, e := range m.Metas { + l = e.Size() + n += 1 + l + sovRpc(uint64(l)) + } + } + return n +} + +func (m *Meta) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Type) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + l = len(m.Help) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + l = len(m.Unit) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + return n +} + +func sovRpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozRpc(x uint64) (n int) { + return sovRpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *MetadataRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetadataRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetadataRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metric", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Metric = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Limit", wireType) + } + m.Limit = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Limit |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PartialResponseStrategy", wireType) + } + m.PartialResponseStrategy = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PartialResponseStrategy |= storepb.PartialResponseStrategy(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *MetadataResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetadataResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetadataResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &MetricMetadata{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &MetadataResponse_Metadata{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warning", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Result = &MetadataResponse_Warning{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *MetricMetadata) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetricMetadata: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetricMetadata: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Metadata == nil { + m.Metadata = make(map[string]MetricMetadataEntry) + } + var mapkey string + mapvalue := &MetricMetadataEntry{} + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthRpc + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthRpc + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthRpc + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthRpc + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &MetricMetadataEntry{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Metadata[mapkey] = *mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *MetricMetadataEntry) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetricMetadataEntry: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetricMetadataEntry: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metas", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Metas = append(m.Metas, Meta{}) + if err := m.Metas[len(m.Metas)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Meta) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Meta: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Meta: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Type = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Help = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Unit = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthRpc + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupRpc + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthRpc + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthRpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupRpc = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/metadata/metadatapb/rpc.proto b/pkg/metadata/metadatapb/rpc.proto new file mode 100644 index 0000000000..f7ab97e720 --- /dev/null +++ b/pkg/metadata/metadatapb/rpc.proto @@ -0,0 +1,56 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +syntax = "proto3"; +package thanos; + +import "store/storepb/types.proto"; +import "gogoproto/gogo.proto"; + +option go_package = "metadatapb"; + +option (gogoproto.sizer_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// Do not generate XXX fields to reduce memory footprint and opening a door +// for zero-copy casts to/from prometheus data types. +option (gogoproto.goproto_unkeyed_all) = false; +option (gogoproto.goproto_unrecognized_all) = false; +option (gogoproto.goproto_sizecache_all) = false; + +service Metadata { + rpc Metadata(MetadataRequest) returns (stream MetadataResponse); +} + +message MetadataRequest { + string metric = 1; + int32 limit = 2; + PartialResponseStrategy partial_response_strategy = 3; +} + +message MetadataResponse { + oneof result { + /// group for rule groups. It is up to server implementation to decide how many of those to put here within single frame. + MetricMetadata metadata = 1; + + /// warning is considered an information piece in place of series for warning purposes. + /// It is used to warn rule API users about suspicious cases or partial response (if enabled). + string warning = 2; + } +} + +message MetricMetadata { + map metadata = 1 [(gogoproto.nullable) = false]; +} + +message MetricMetadataEntry { + repeated Meta metas = 2 [(gogoproto.nullable) = false]; +} + +message Meta { + string type = 1 [(gogoproto.jsontag) = "type"]; + string help = 2 [(gogoproto.jsontag) = "help"]; + string unit = 3 [(gogoproto.jsontag) = "unit"]; +} diff --git a/pkg/metadata/prometheus.go b/pkg/metadata/prometheus.go new file mode 100644 index 0000000000..40be80ffcb --- /dev/null +++ b/pkg/metadata/prometheus.go @@ -0,0 +1,36 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "net/url" + + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" + "github.com/thanos-io/thanos/pkg/promclient" +) + +// Prometheus implements metadatapb.Metadata gRPC that allows to fetch metric metadata from Prometheus HTTP /api/v1/metadata endpoint. +type Prometheus struct { + base *url.URL + client *promclient.Client +} + +// NewPrometheus creates a new metadata.Prometheus. +func NewPrometheus(base *url.URL, client *promclient.Client) *Prometheus { + return &Prometheus{ + base: base, + client: client, + } +} + +// Metadata returns all specified metric metadata from Prometheus. +func (p *Prometheus) Metadata(r *metadatapb.MetadataRequest, s metadatapb.Metadata_MetadataServer) error { + md, err := p.client.MetadataInGRPC(s.Context(), p.base, r.Metric, int(r.Limit)) + if err != nil { + return err + } + + return s.Send(&metadatapb.MetadataResponse{Result: &metadatapb.MetadataResponse_Metadata{ + Metadata: metadatapb.FromMetadataMap(md)}}) +} diff --git a/pkg/metadata/prometheus_test.go b/pkg/metadata/prometheus_test.go new file mode 100644 index 0000000000..d87457959c --- /dev/null +++ b/pkg/metadata/prometheus_test.go @@ -0,0 +1,96 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestPrometheus_Metadata_e2e(t *testing.T) { + p, err := e2eutil.NewPrometheus() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() + + testutil.Ok(t, p.SetConfig(` +global: + external_labels: + region: eu-west +scrape_configs: +- job_name: 'myself' + # Quick scrapes for test purposes. + scrape_interval: 1s + scrape_timeout: 1s + static_configs: + - targets: ['localhost:9090','localhost:80'] +`)) + testutil.Ok(t, p.Start()) + + time.Sleep(10 * time.Second) + + u, err := url.Parse("http://" + p.Addr()) + testutil.Ok(t, err) + + prom := NewPrometheus(u, promclient.NewDefaultClient()) + + for _, tcase := range []struct { + name string + metric string + limit int32 + expectedFunc func(map[string][]metadatapb.Meta) bool + }{ + { + name: "all metadata return", + limit: -1, + expectedFunc: func(m map[string][]metadatapb.Meta) bool { + return len(m["thanos_build_info"]) > 0 && len(m["prometheus_build_info"]) > 0 + }, + }, + { + name: "no metadata return", + limit: 0, + expectedFunc: func(m map[string][]metadatapb.Meta) bool { + return len(m) == 0 + }, + }, + { + name: "only 1 metadata return", + limit: 1, + expectedFunc: func(m map[string][]metadatapb.Meta) bool { + return len(m) == 1 + }, + }, + { + name: "only thanos_build_info metadata return", + metric: "thanos_build_info", + limit: 1, + expectedFunc: func(m map[string][]metadatapb.Meta) bool { + return len(m) == 1 && len(m["thanos_build_info"]) > 0 + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + meta, w, err := NewGRPCClient(prom).Metadata(context.Background(), &metadatapb.MetadataRequest{ + Metric: tcase.metric, + Limit: tcase.limit, + }) + testutil.Equals(t, storage.Warnings(nil), w) + testutil.Ok(t, err) + testutil.Assert(t, true, tcase.expectedFunc(meta)) + }) + } +} diff --git a/pkg/metadata/proxy.go b/pkg/metadata/proxy.go new file mode 100644 index 0000000000..8046053582 --- /dev/null +++ b/pkg/metadata/proxy.go @@ -0,0 +1,138 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "context" + "io" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// Proxy implements metadatapb.Metadata gRPC that fanouts requests to given metadatapb.Metadata and deduplication on the way. +type Proxy struct { + logger log.Logger + metadata func() []metadatapb.MetadataClient +} + +func RegisterMetadataServer(metadataSrv metadatapb.MetadataServer) func(*grpc.Server) { + return func(s *grpc.Server) { + metadatapb.RegisterMetadataServer(s, metadataSrv) + } +} + +// NewProxy returns a new metadata.Proxy. +func NewProxy(logger log.Logger, metadata func() []metadatapb.MetadataClient) *Proxy { + return &Proxy{ + logger: logger, + metadata: metadata, + } +} + +func (s *Proxy) Metadata(req *metadatapb.MetadataRequest, srv metadatapb.Metadata_MetadataServer) error { + var ( + g, gctx = errgroup.WithContext(srv.Context()) + respChan = make(chan *metadatapb.MetricMetadata, 10) + metas []*metadatapb.MetricMetadata + ) + + for _, metadataClient := range s.metadata() { + rs := &metadataStream{ + client: metadataClient, + request: req, + channel: respChan, + server: srv, + } + g.Go(func() error { return rs.receive(gctx) }) + } + + go func() { + _ = g.Wait() + close(respChan) + }() + + for resp := range respChan { + metas = append(metas, resp) + } + + if err := g.Wait(); err != nil { + level.Error(s.logger).Log("err", err) + return err + } + + for _, t := range metas { + if err := srv.Send(metadatapb.NewMetadataResponse(t)); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send metadata response").Error()) + } + } + + return nil +} + +type metadataStream struct { + client metadatapb.MetadataClient + request *metadatapb.MetadataRequest + channel chan<- *metadatapb.MetricMetadata + server metadatapb.Metadata_MetadataServer +} + +func (stream *metadataStream) receive(ctx context.Context) error { + metadataCli, err := stream.client.Metadata(ctx, stream.request) + if err != nil { + err = errors.Wrapf(err, "fetching metadata from metadata client %v", stream.client) + + if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + return err + } + + if serr := stream.server.Send(metadatapb.NewWarningMetadataResponse(err)); serr != nil { + return serr + } + // Not an error if response strategy is warning. + return nil + } + + for { + resp, err := metadataCli.Recv() + if err == io.EOF { + return nil + } + + if err != nil { + err = errors.Wrapf(err, "receiving metadata from metadata client %v", stream.client) + + if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { + return err + } + + if err := stream.server.Send(metadatapb.NewWarningMetadataResponse(err)); err != nil { + return errors.Wrapf(err, "sending metadata error to server %v", stream.server) + } + + continue + } + + if w := resp.GetWarning(); w != "" { + if err := stream.server.Send(metadatapb.NewWarningMetadataResponse(errors.New(w))); err != nil { + return errors.Wrapf(err, "sending metadata warning to server %v", stream.server) + } + continue + } + + select { + case stream.channel <- resp.GetMetadata(): + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 754291345f..58fb296a03 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -736,3 +737,24 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin } return m.Data.Groups, nil } + +func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/metadata") + q := u.Query() + + if metric != "" { + q.Add("metric", metric) + } + // We only set limit when it is >= 0. + if limit >= 0 { + q.Add("limit", strconv.Itoa(limit)) + } + + u.RawQuery = q.Encode() + + var v struct { + Data map[string][]metadatapb.Meta `json:"data"` + } + return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/metadata HTTP[client]", &u, &v) +} diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index d4f3304d3e..462f5d10bf 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -16,13 +16,14 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" - "github.com/thanos-io/thanos/pkg/store/labelpb" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -49,6 +50,11 @@ type RuleSpec interface { Addr() string } +type MetadataSpec interface { + // Addr returns RulesAPI Address for the rules spec. It is used as its ID. + Addr() string +} + // stringError forces the error to be a string // when marshaled into a JSON. type stringError struct { @@ -181,6 +187,7 @@ type StoreSet struct { // accessible and we close gRPC client for it. storeSpecs func() []StoreSpec ruleSpecs func() []RuleSpec + metadataSpecs func() []MetadataSpec dialOpts []grpc.DialOption gRPCInfoCallTimeout time.Duration @@ -203,6 +210,7 @@ func NewStoreSet( reg *prometheus.Registry, storeSpecs func() []StoreSpec, ruleSpecs func() []RuleSpec, + metadataSpecs func() []MetadataSpec, dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, ) *StoreSet { @@ -220,11 +228,15 @@ func NewStoreSet( if ruleSpecs == nil { ruleSpecs = func() []RuleSpec { return nil } } + if metadataSpecs == nil { + metadataSpecs = func() []MetadataSpec { return nil } + } ss := &StoreSet{ logger: log.With(logger, "component", "storeset"), storeSpecs: storeSpecs, ruleSpecs: ruleSpecs, + metadataSpecs: metadataSpecs, dialOpts: dialOpts, storesMetric: storesMetric, gRPCInfoCallTimeout: 5 * time.Second, @@ -243,7 +255,8 @@ type storeRef struct { cc *grpc.ClientConn addr string // If rule is not nil, then this store also supports rules API. - rule rulespb.RulesClient + rule rulespb.RulesClient + metadata metadatapb.MetadataClient // Meta (can change during runtime). labelSets []labels.Labels @@ -254,7 +267,7 @@ type storeRef struct { logger log.Logger } -func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient) { +func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int64, storeType component.StoreAPI, rule rulespb.RulesClient, metadata metadatapb.MetadataClient) { s.mtx.Lock() defer s.mtx.Unlock() @@ -263,6 +276,7 @@ func (s *storeRef) Update(labelSets []labels.Labels, minTime int64, maxTime int6 s.minTime = minTime s.maxTime = maxTime s.rule = rule + s.metadata = metadata } func (s *storeRef) StoreType() component.StoreAPI { @@ -279,6 +293,13 @@ func (s *storeRef) HasRulesAPI() bool { return s.rule != nil } +func (s *storeRef) HasMetadataAPI() bool { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.metadata != nil +} + func (s *storeRef) LabelSets() []labels.Labels { s.mtx.RLock() defer s.mtx.RUnlock() @@ -402,8 +423,9 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store mtx sync.Mutex wg sync.WaitGroup - storeAddrSet = make(map[string]struct{}) - ruleAddrSet = make(map[string]struct{}) + storeAddrSet = make(map[string]struct{}) + ruleAddrSet = make(map[string]struct{}) + metadataAddrSet = make(map[string]struct{}) ) // Gather active stores map concurrently. Build new store if does not exist already. @@ -411,6 +433,11 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store ruleAddrSet[ruleSpec.Addr()] = struct{}{} } + // Gather active stores map concurrently. Build new store if does not exist already. + for _, metadataSpec := range s.metadataSpecs() { + metadataAddrSet[metadataSpec.Addr()] = struct{}{} + } + // Gather healthy stores map concurrently. Build new store if does not exist already. for _, storeSpec := range s.storeSpecs() { if _, ok := storeAddrSet[storeSpec.Addr()]; ok { @@ -446,6 +473,11 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store rule = rulespb.NewRulesClient(st.cc) } + var metadata metadatapb.MetadataClient + if _, ok := metadataAddrSet[addr]; ok { + metadata = metadatapb.NewMetadataClient(st.cc) + } + // Check existing or new store. Is it healthy? What are current metadata? labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient) if err != nil { @@ -470,7 +502,7 @@ func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*store } s.updateStoreStatus(st, nil) - st.Update(labelSets, minTime, maxTime, storeType, rule) + st.Update(labelSets, minTime, maxTime, storeType, rule, metadata) mtx.Lock() defer mtx.Unlock() @@ -554,6 +586,20 @@ func (s *StoreSet) GetRulesClients() []rulespb.RulesClient { return rules } +// GetRulesClients returns a list of all active rules clients. +func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient { + s.storesMtx.RLock() + defer s.storesMtx.RUnlock() + + metadataClients := make([]metadatapb.MetadataClient, 0, len(s.stores)) + for _, st := range s.stores { + if st.HasMetadataAPI() { + metadataClients = append(metadataClients, st.metadata) + } + } + return metadataClients +} + func (s *StoreSet) Close() { s.storesMtx.Lock() defer s.storesMtx.Unlock() diff --git a/pkg/query/storeset_test.go b/pkg/query/storeset_test.go index 9429a6e44c..7de48faafb 100644 --- a/pkg/query/storeset_test.go +++ b/pkg/query/storeset_test.go @@ -196,6 +196,9 @@ func TestStoreSet_Update(t *testing.T) { func() (specs []RuleSpec) { return nil }, + func() (specs []MetadataSpec) { + return nil + }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second defer storeSet.Close() @@ -545,6 +548,7 @@ func TestStoreSet_Update_NoneAvailable(t *testing.T) { return specs }, func() (specs []RuleSpec) { return nil }, + func() (specs []MetadataSpec) { return nil }, testGRPCOpts, time.Minute) storeSet.gRPCInfoCallTimeout = 2 * time.Second @@ -629,6 +633,8 @@ func TestQuerierStrict(t *testing.T) { } }, func() []RuleSpec { return nil + }, func() (specs []MetadataSpec) { + return nil }, testGRPCOpts, time.Minute) defer storeSet.Close() storeSet.gRPCInfoCallTimeout = 1 * time.Second @@ -758,6 +764,7 @@ func TestStoreSet_Update_Rules(t *testing.T) { storeSet := NewStoreSet(nil, nil, tc.storeSpecs, tc.ruleSpecs, + func() []MetadataSpec { return nil }, testGRPCOpts, time.Minute) t.Run(tc.name, func(t *testing.T) { @@ -930,6 +937,9 @@ func TestStoreSet_Rules_Discovery(t *testing.T) { return tc.states[currentState].ruleSpecs() }, + func() []MetadataSpec { + return nil + }, testGRPCOpts, time.Minute) defer storeSet.Close() diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 66b016ecca..83f1f7da90 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -25,7 +25,7 @@ PATH=${PATH}:/tmp/protobin GOGOPROTO_ROOT="$(GO111MODULE=on go list -modfile=.bingo/protoc-gen-gogofast.mod -f '{{ .Dir }}' -m github.com/gogo/protobuf)" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" -DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb store/hintspb queryfrontend" +DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb store/hintspb queryfrontend metadata/metadatapb" echo "generating code" pushd "pkg" for dir in ${DIRS}; do diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index bfba33da5c..560558775a 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -433,7 +433,7 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 8f692883aa..87a24acb53 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -114,7 +114,7 @@ func NewPrometheusWithSidecar(sharedDir string, netName string, name string, con return prom, sidecar, nil } -func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ruleAddresses []string, routePrefix, externalPrefix string) (*Service, error) { +func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ruleAddresses, metadataAddresses []string, routePrefix, externalPrefix string) (*Service, error) { const replicaLabel = "replica" args := e2e.BuildArgs(map[string]string{ @@ -136,6 +136,10 @@ func NewQuerier(sharedDir, name string, storeAddresses, fileSDStoreAddresses, ru args = append(args, "--rule="+addr) } + for _, addr := range metadataAddresses { + args = append(args, "--metadata="+addr) + } + if len(fileSDStoreAddresses) > 0 { queryFileSDDir := filepath.Join(sharedDir, "data", "querier", name) container := filepath.Join(e2e.ContainerSharedDir, "data", "querier", name) diff --git a/test/e2e/metadata_api_test.go b/test/e2e/metadata_api_test.go new file mode 100644 index 0000000000..d0e2897c5e --- /dev/null +++ b/test/e2e/metadata_api_test.go @@ -0,0 +1,95 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package e2e_test + +import ( + "context" + "testing" + "time" + + "github.com/cortexproject/cortex/integration/e2e" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" +) + +func TestMetadataAPI_Fanout(t *testing.T) { + t.Parallel() + + netName := "e2e_test_metadata_fanout" + + s, err := e2e.NewScenario(netName) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + // 2x Prometheus. + // Each Prometheus scrapes its own metrics and Sidecar's metrics. + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar( + s.SharedDir(), + netName, + "prom1", + defaultPromConfig("ha", 0, "", "", "localhost:9090", "sidecar-prom1:8080"), + e2ethanos.DefaultPrometheusImage(), + ) + testutil.Ok(t, err) + + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar( + s.SharedDir(), + netName, + "prom2", + defaultPromConfig("ha", 1, "", "", "localhost:9090", "sidecar-prom1:8080"), + e2ethanos.DefaultPrometheusImage(), + ) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) + + q, err := e2ethanos.NewQuerier( + s.SharedDir(), + "query", + []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()}, + nil, + nil, + []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()}, + "", + "", + ) + + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_query_metadata_apis_dns_provider_results"}, e2e.WaitMissingMetrics)) + + promMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1) + testutil.Ok(t, err) + + thanosMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1) + testutil.Ok(t, err) + + // Metadata response from Prometheus and Thanos Querier should be the same after deduplication. + testutil.Equals(t, thanosMeta, promMeta) + + // We only expect to see one metadata returned. + thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 1) + testutil.Ok(t, err) + testutil.Assert(t, true, len(thanosMeta) == 1) + + // We only expect to see ten metadata returned. + thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 10) + testutil.Ok(t, err) + testutil.Assert(t, true, len(thanosMeta) == 10) + + // No metadata returned. + thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 0) + testutil.Ok(t, err) + testutil.Assert(t, true, len(thanosMeta) == 0) + + // Only prometheus_build_info metric will be returned. + thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "prometheus_build_info", -1) + testutil.Ok(t, err) + testutil.Assert(t, true, len(thanosMeta) == 1 && len(thanosMeta["prometheus_build_info"]) > 0) +} diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 9dff09f998..27dd9b123d 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -34,7 +34,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -388,7 +388,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index e36b0c7952..00215e01f6 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -38,7 +38,11 @@ const queryUpWithoutInstance = "sum(up) without (instance)" // * expose 2 external labels, source and replica. // * scrape fake target. This will produce up == 0 metric which we can assert on. // * optionally remote write endpoint to write into. -func defaultPromConfig(name string, replica int, remoteWriteEndpoint, ruleFile string) string { +func defaultPromConfig(name string, replica int, remoteWriteEndpoint, ruleFile string, scrapeTargets ...string) string { + targets := "localhost:9090" + if len(scrapeTargets) > 0 { + targets = strings.Join(scrapeTargets, ",") + } config := fmt.Sprintf(` global: external_labels: @@ -50,8 +54,8 @@ scrape_configs: scrape_interval: 1s scrape_timeout: 1s static_configs: - - targets: ['localhost:9090'] -`, name, replica) + - targets: [%s] +`, name, replica, targets) if remoteWriteEndpoint != "" { config = fmt.Sprintf(` @@ -104,7 +108,7 @@ func TestQuery(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) // Querier. Both fileSD and directly by flags. - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -184,6 +188,7 @@ func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) { nil, nil, nil, + nil, "", externalPrefix, ) @@ -207,6 +212,7 @@ func TestQueryExternalPrefix(t *testing.T) { nil, nil, nil, + nil, "", externalPrefix, ) @@ -236,6 +242,7 @@ func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) { nil, nil, nil, + nil, routePrefix, externalPrefix, ) @@ -273,6 +280,7 @@ func TestQueryLabelNames(t *testing.T) { []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{}, nil, + nil, "", "", ) @@ -331,6 +339,7 @@ func TestQueryLabelValues(t *testing.T) { []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}, []string{}, nil, + nil, "", "", ) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index a92b5baf2b..9aa599f40c 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -102,7 +102,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -178,7 +178,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -253,7 +253,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -325,7 +325,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -400,7 +400,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1)) testutil.Ok(t, s.StartAndWaitReady(prom2)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 10d6a1b7a4..0417ca6e29 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -291,7 +291,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) { { EndpointsConfig: http_util.EndpointsConfig{ StaticAddresses: func() []string { - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", nil, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", nil, nil, nil, nil, "", "") testutil.Ok(t, err) return []string{q.NetworkHTTPEndpointFor(s.NetworkName())} }(), @@ -302,7 +302,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -383,7 +383,7 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index d67c775e3a..67d18c2b41 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -69,6 +69,7 @@ func TestRulesAPI_Fanout(t *testing.T) { []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, nil, []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, + nil, "", "", ) diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 31ab0e30fa..332b54eadf 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -61,7 +61,7 @@ func TestStoreGateway(t *testing.T) { // Ensure bucket UI. ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(s1.HTTPEndpoint(), "loaded")) - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}, nil, nil, "", "") + q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}, nil, nil, nil, "", "") testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) From d18d9bb5e6cb6e788dddcba16fcbfdd24be8d0b0 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 3 Feb 2021 20:34:42 -0500 Subject: [PATCH 2/6] update comments Signed-off-by: yeya24 --- cmd/thanos/query.go | 4 ++-- pkg/query/storeset.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index dbe115a8ea..9b0228047f 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -152,7 +152,7 @@ func registerQuery(app *extkingpin.App) { } if dup := firstDuplicate(*metadataEndpoints); dup != "" { - return errors.Errorf("Address %s is duplicated for --target flag.", dup) + return errors.Errorf("Address %s is duplicated for --metadata flag.", dup) } var fileSD *file.Discovery @@ -438,7 +438,7 @@ func runQuery( level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) } if err := dnsMetadataProvider.Resolve(resolveCtx, metadataAddrs); err != nil { - level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err) + level.Error(logger).Log("msg", "failed to resolve addresses for metadataAPIs", "err", err) } return nil }) diff --git a/pkg/query/storeset.go b/pkg/query/storeset.go index 462f5d10bf..b014f683e3 100644 --- a/pkg/query/storeset.go +++ b/pkg/query/storeset.go @@ -51,7 +51,7 @@ type RuleSpec interface { } type MetadataSpec interface { - // Addr returns RulesAPI Address for the rules spec. It is used as its ID. + // Addr returns MetadataAPI Address for the metadata spec. It is used as its ID. Addr() string } @@ -586,7 +586,7 @@ func (s *StoreSet) GetRulesClients() []rulespb.RulesClient { return rules } -// GetRulesClients returns a list of all active rules clients. +// GetMetadataClients returns a list of all active metadata clients. func (s *StoreSet) GetMetadataClients() []metadatapb.MetadataClient { s.storesMtx.RLock() defer s.storesMtx.RUnlock() From 810929e2662155cf613c7ef25eac03188f29fa20 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 14 Feb 2021 14:15:56 -0500 Subject: [PATCH 3/6] use parseInt Signed-off-by: yeya24 --- pkg/api/query/v1.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 7c2cc40c9c..7c7d3fcde3 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -817,7 +817,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse limitStr := r.URL.Query().Get("limit") if limitStr != "" { - limit, err := strconv.Atoi(limitStr) + limit, err := strconv.ParseInt(limitStr, 10, 64) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid metric metadata limit='%v'", limit)} } From 1c10b72c9c831d56a936720e5651b2077a33b49b Mon Sep 17 00:00:00 2001 From: yeya24 Date: Tue, 23 Feb 2021 14:00:12 -0500 Subject: [PATCH 4/6] address Prem's comments Signed-off-by: yeya24 --- cmd/thanos/query.go | 3 --- pkg/api/query/v1.go | 2 +- test/e2e/metadata_api_test.go | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 9b0228047f..e1d7be2e99 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -340,9 +340,6 @@ func runQuery( specs = append(specs, query.NewGRPCStoreSpec(addr, false)) } - // NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis. - // hence, any duplicates will be tracked in the store api set. - return specs }, dialOpts, diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 7c7d3fcde3..227c6ba821 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -817,7 +817,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse limitStr := r.URL.Query().Get("limit") if limitStr != "" { - limit, err := strconv.ParseInt(limitStr, 10, 64) + limit, err := strconv.ParseInt(limitStr, 10, 32) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid metric metadata limit='%v'", limit)} } diff --git a/test/e2e/metadata_api_test.go b/test/e2e/metadata_api_test.go index d0e2897c5e..6211525f0c 100644 --- a/test/e2e/metadata_api_test.go +++ b/test/e2e/metadata_api_test.go @@ -38,7 +38,7 @@ func TestMetadataAPI_Fanout(t *testing.T) { s.SharedDir(), netName, "prom2", - defaultPromConfig("ha", 1, "", "", "localhost:9090", "sidecar-prom1:8080"), + defaultPromConfig("ha", 1, "", "", "localhost:9090", "sidecar-prom2:8080"), e2ethanos.DefaultPrometheusImage(), ) testutil.Ok(t, err) From 046a53c360040f56e2cae5ddc183ac365ab1cda4 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 24 Feb 2021 17:29:17 -0500 Subject: [PATCH 5/6] update proto comment Signed-off-by: yeya24 --- pkg/metadata/metadatapb/rpc.proto | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/metadata/metadatapb/rpc.proto b/pkg/metadata/metadatapb/rpc.proto index f7ab97e720..16f0706692 100644 --- a/pkg/metadata/metadatapb/rpc.proto +++ b/pkg/metadata/metadatapb/rpc.proto @@ -32,11 +32,11 @@ message MetadataRequest { message MetadataResponse { oneof result { - /// group for rule groups. It is up to server implementation to decide how many of those to put here within single frame. + /// A collection of metric metadata entries. MetricMetadata metadata = 1; /// warning is considered an information piece in place of series for warning purposes. - /// It is used to warn rule API users about suspicious cases or partial response (if enabled). + /// It is used to warn metadata API users about suspicious cases or partial response (if enabled). string warning = 2; } } From 3f7e42e36b2890c092dfc92e211a46053bf3331c Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 24 Feb 2021 17:52:58 -0500 Subject: [PATCH 6/6] add changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d03e4cc8d..40d498afaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3700](https://github.com/thanos-io/thanos/pull/3700) ui: make old bucket viewer UI work with vanilla Prometheus blocks - [#2641](https://github.com/thanos-io/thanos/issues/2641) Query Frontend: Added `--query-range.request-downsampled` flag enabling additional queries for downsampled data in case of empty or incomplete response to range request. - [#3792](https://github.com/thanos-io/thanos/pull/3792) Receiver: Added `--tsdb.allow-overlapping-blocks` flag to allow overlapping tsdb blocks and enable vertical compaction +- [#3686](https://github.com/thanos-io/thanos/pull/3686) Query: Added federated metric metadata support. ### Fixed