diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 758c91a97ad..a41194c7db8 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -782,12 +782,12 @@ func runQuery( info.WithStoreInfoFunc(func() *infopb.StoreInfo { if httpProbe.IsReady() { mint, maxt := proxy.TimeRange() - return &infopb.StoreInfo{ MinTime: mint, MaxTime: maxt, SupportsSharding: true, SupportsWithoutReplicaLabels: true, + TsdbInfos: proxy.TSDBInfos(), } } return nil diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index cc01e74a17d..2c2181523b7 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -342,6 +342,7 @@ func runReceive( MaxTime: maxTime, SupportsSharding: true, SupportsWithoutReplicaLabels: true, + TsdbInfos: proxy.TSDBInfos(), } } return nil diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6edf95416c1..6a57a55296e 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -632,6 +632,7 @@ func runRule( MaxTime: maxt, SupportsSharding: true, SupportsWithoutReplicaLabels: true, + TsdbInfos: tsdbStore.TSDBInfos(), } } return nil diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 8fd399232e3..ddd24104ed0 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -272,6 +272,7 @@ func runSidecar( MaxTime: maxt, SupportsSharding: true, SupportsWithoutReplicaLabels: true, + TsdbInfos: promStore.TSDBInfos(), } } return nil diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ea484e67fca..8661e95c253 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -438,6 +438,7 @@ func runStore( MaxTime: maxt, SupportsSharding: true, SupportsWithoutReplicaLabels: true, + TsdbInfos: bs.TSDBInfos(), } } return nil diff --git a/pkg/info/infopb/custom.pb.go b/pkg/info/infopb/custom.pb.go new file mode 100644 index 00000000000..8d50ee2c4b9 --- /dev/null +++ b/pkg/info/infopb/custom.pb.go @@ -0,0 +1,40 @@ +package infopb + +import ( + "math" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/store/labelpb" +) + +func NewTSDBInfo(mint, maxt int64, lbls []labelpb.ZLabel) TSDBInfo { + return TSDBInfo{ + Labels: labelpb.ZLabelSet{ + Labels: lbls, + }, + MinTime: mint, + MaxTime: maxt, + } +} + +type TSDBInfos []TSDBInfo + +func (infos TSDBInfos) MaxT() int64 { + var maxt int64 = math.MinInt64 + for _, info := range infos { + if info.MaxTime > maxt { + maxt = info.MaxTime + } + } + return maxt +} + +func (infos TSDBInfos) LabelSets() []labels.Labels { + lsets := make([]labels.Labels, 0, len(infos)) + for _, info := range infos { + lsets = append(lsets, labelpb.ZLabelsToPromLabels(info.Labels.Labels)) + + } + return lsets +} diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index 6153dbb4cf5..c88be63cfc1 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -122,6 +122,8 @@ type StoreInfo struct { SupportsSharding bool `protobuf:"varint,3,opt,name=supports_sharding,json=supportsSharding,proto3" json:"supports_sharding,omitempty"` // replica_aware means this store supports without_replica_labels of StoreAPI.Series. SupportsWithoutReplicaLabels bool `protobuf:"varint,5,opt,name=supports_without_replica_labels,json=supportsWithoutReplicaLabels,proto3" json:"supports_without_replica_labels,omitempty"` + // TSDBInfos holds metadata for all TSDBs exposed by the store. + TsdbInfos []TSDBInfo `protobuf:"bytes,6,rep,name=tsdb_infos,json=tsdbInfos,proto3" json:"tsdb_infos"` } func (m *StoreInfo) Reset() { *m = StoreInfo{} } @@ -344,6 +346,45 @@ func (m *QueryAPIInfo) XXX_DiscardUnknown() { var xxx_messageInfo_QueryAPIInfo proto.InternalMessageInfo +type TSDBInfo struct { + Labels labelpb.ZLabelSet `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` + MinTime int64 `protobuf:"varint,2,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` + MaxTime int64 `protobuf:"varint,3,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` +} + +func (m *TSDBInfo) Reset() { *m = TSDBInfo{} } +func (m *TSDBInfo) String() string { return proto.CompactTextString(m) } +func (*TSDBInfo) ProtoMessage() {} +func (*TSDBInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{8} +} +func (m *TSDBInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TSDBInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TSDBInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TSDBInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_TSDBInfo.Merge(m, src) +} +func (m *TSDBInfo) XXX_Size() int { + return m.Size() +} +func (m *TSDBInfo) XXX_DiscardUnknown() { + xxx_messageInfo_TSDBInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_TSDBInfo proto.InternalMessageInfo + func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.info.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.info.InfoResponse") @@ -353,46 +394,50 @@ func init() { proto.RegisterType((*TargetsInfo)(nil), "thanos.info.TargetsInfo") proto.RegisterType((*ExemplarsInfo)(nil), "thanos.info.ExemplarsInfo") proto.RegisterType((*QueryAPIInfo)(nil), "thanos.info.QueryAPIInfo") + proto.RegisterType((*TSDBInfo)(nil), "thanos.info.TSDBInfo") } func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 533 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xdf, 0x6a, 0xdb, 0x30, - 0x14, 0xc6, 0xe3, 0xe6, 0xbf, 0xd2, 0x74, 0xad, 0xe8, 0x86, 0x13, 0x86, 0x13, 0x4c, 0x2f, 0x02, - 0x1b, 0x31, 0x64, 0x30, 0x06, 0xbb, 0x5a, 0x4b, 0x60, 0x1d, 0x2b, 0x6c, 0x4e, 0x60, 0xd0, 0x1b, - 0xa3, 0xa4, 0x6a, 0x62, 0xb0, 0x2d, 0x55, 0x92, 0x59, 0xf2, 0x16, 0x7b, 0x95, 0x5d, 0xef, 0x05, - 0x72, 0xd9, 0xcb, 0x5d, 0x8d, 0x2d, 0x79, 0x91, 0xa1, 0x23, 0x27, 0x8b, 0x59, 0xaf, 0x7a, 0x93, - 0x48, 0xe7, 0xfb, 0x7d, 0xc7, 0xd2, 0x39, 0x3a, 0xe8, 0x69, 0x98, 0xdc, 0x32, 0x4f, 0xff, 0xf0, - 0x89, 0x27, 0xf8, 0xb4, 0xcf, 0x05, 0x53, 0x0c, 0x37, 0xd4, 0x9c, 0x24, 0x4c, 0xf6, 0xb5, 0xd0, - 0x6e, 0x49, 0xc5, 0x04, 0xf5, 0x22, 0x32, 0xa1, 0x11, 0x9f, 0x78, 0x6a, 0xc9, 0xa9, 0x34, 0x5c, - 0xfb, 0x74, 0xc6, 0x66, 0x0c, 0x96, 0x9e, 0x5e, 0x99, 0xa8, 0xdb, 0x44, 0x8d, 0xcb, 0xe4, 0x96, - 0xf9, 0xf4, 0x2e, 0xa5, 0x52, 0xb9, 0xdf, 0x8b, 0xe8, 0xd0, 0xec, 0x25, 0x67, 0x89, 0xa4, 0xf8, - 0x35, 0x42, 0x90, 0x2c, 0x90, 0x54, 0x49, 0xdb, 0xea, 0x16, 0x7b, 0x8d, 0xc1, 0x49, 0x3f, 0xfb, - 0xe4, 0xf5, 0x47, 0x2d, 0x8d, 0xa8, 0x3a, 0x2f, 0xad, 0x7e, 0x75, 0x0a, 0x7e, 0x3d, 0xca, 0xf6, - 0x12, 0x9f, 0xa1, 0xe6, 0x05, 0x8b, 0x39, 0x4b, 0x68, 0xa2, 0xc6, 0x4b, 0x4e, 0xed, 0x83, 0xae, - 0xd5, 0xab, 0xfb, 0xf9, 0x20, 0x7e, 0x89, 0xca, 0x70, 0x60, 0xbb, 0xd8, 0xb5, 0x7a, 0x8d, 0xc1, - 0xb3, 0xfe, 0xde, 0x5d, 0xfa, 0x23, 0xad, 0xc0, 0x61, 0x0c, 0xa4, 0x69, 0x91, 0x46, 0x54, 0xda, - 0xa5, 0x07, 0x68, 0x5f, 0x2b, 0x86, 0x06, 0x08, 0xbf, 0x47, 0x4f, 0x62, 0xaa, 0x44, 0x38, 0x0d, - 0x62, 0xaa, 0xc8, 0x0d, 0x51, 0xc4, 0x2e, 0x83, 0xaf, 0x93, 0xf3, 0x5d, 0x01, 0x73, 0x95, 0x21, - 0x90, 0xe0, 0x28, 0xce, 0xc5, 0xf0, 0x00, 0x55, 0x15, 0x11, 0x33, 0x5d, 0x80, 0x0a, 0x64, 0xb0, - 0x73, 0x19, 0xc6, 0x46, 0x03, 0xeb, 0x16, 0xc4, 0x6f, 0x50, 0x9d, 0x2e, 0x68, 0xcc, 0x23, 0x22, - 0xa4, 0x5d, 0x05, 0x57, 0x3b, 0xe7, 0x1a, 0x6e, 0x55, 0xf0, 0xfd, 0x83, 0xb1, 0x87, 0xca, 0x77, - 0x29, 0x15, 0x4b, 0xbb, 0x06, 0xae, 0x56, 0xce, 0xf5, 0x59, 0x2b, 0xef, 0x3e, 0x5d, 0x9a, 0x8b, - 0x02, 0xe7, 0xfe, 0xb0, 0x50, 0x7d, 0x57, 0x2b, 0xdc, 0x42, 0xb5, 0x38, 0x4c, 0x02, 0x15, 0xc6, - 0xd4, 0xb6, 0xba, 0x56, 0xaf, 0xe8, 0x57, 0xe3, 0x30, 0x19, 0x87, 0x31, 0x05, 0x89, 0x2c, 0x8c, - 0x74, 0x90, 0x49, 0x64, 0x01, 0xd2, 0x0b, 0x74, 0x22, 0x53, 0xce, 0x99, 0x50, 0x32, 0x90, 0x73, - 0x22, 0x6e, 0xc2, 0x64, 0x06, 0x4d, 0xa9, 0xf9, 0xc7, 0x5b, 0x61, 0x94, 0xc5, 0xf1, 0x10, 0x75, - 0x76, 0xf0, 0xd7, 0x50, 0xcd, 0x59, 0xaa, 0x02, 0x41, 0x79, 0x14, 0x4e, 0x49, 0x00, 0x2f, 0x40, - 0x42, 0xa5, 0x6b, 0xfe, 0xf3, 0x2d, 0xf6, 0xc5, 0x50, 0xbe, 0x81, 0xe0, 0xd5, 0xc8, 0x0f, 0xa5, - 0x5a, 0xe9, 0xb8, 0xec, 0x36, 0x50, 0x7d, 0xd7, 0x3a, 0xf7, 0x14, 0xe1, 0xff, 0xfb, 0xa1, 0xdf, - 0xe8, 0x5e, 0x8d, 0xdd, 0x21, 0x6a, 0xe6, 0x8a, 0xf7, 0xb8, 0x2b, 0xbb, 0x47, 0xe8, 0x70, 0xbf, - 0x9a, 0x83, 0x0b, 0x54, 0x82, 0x6c, 0x6f, 0xb3, 0xff, 0x7c, 0x93, 0xf7, 0x86, 0xa4, 0xdd, 0x7a, - 0x40, 0x31, 0xe3, 0x72, 0x7e, 0xb6, 0xfa, 0xe3, 0x14, 0x56, 0x6b, 0xc7, 0xba, 0x5f, 0x3b, 0xd6, - 0xef, 0xb5, 0x63, 0x7d, 0xdb, 0x38, 0x85, 0xfb, 0x8d, 0x53, 0xf8, 0xb9, 0x71, 0x0a, 0xd7, 0x15, - 0x33, 0xbc, 0x93, 0x0a, 0xcc, 0xde, 0xab, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x1d, 0x6e, - 0xa7, 0xd2, 0x03, 0x00, 0x00, + // 589 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xdf, 0x8a, 0xda, 0x40, + 0x14, 0xc6, 0x8d, 0x7f, 0xe3, 0x71, 0xdd, 0xee, 0x0e, 0xbb, 0x25, 0x4a, 0x89, 0x12, 0xf6, 0x42, + 0x68, 0x31, 0x60, 0xa1, 0x94, 0xf6, 0xaa, 0x6e, 0x85, 0x6e, 0xe9, 0x42, 0x1b, 0x85, 0xc2, 0xde, + 0x84, 0xa8, 0xb3, 0x1a, 0x48, 0x32, 0x63, 0x66, 0xa4, 0xfa, 0x16, 0x7d, 0x95, 0xbe, 0x85, 0x97, + 0x7b, 0xd9, 0xab, 0xd2, 0xea, 0x43, 0xf4, 0xb6, 0xcc, 0x4c, 0x62, 0x0d, 0xdd, 0xbd, 0xe9, 0x8d, + 0x66, 0xe6, 0xfb, 0x9d, 0xc9, 0x39, 0xdf, 0x39, 0x13, 0x38, 0xf7, 0xa3, 0x5b, 0x62, 0x8b, 0x1f, + 0x3a, 0xb6, 0x63, 0x3a, 0xe9, 0xd2, 0x98, 0x70, 0x82, 0x6a, 0x7c, 0xee, 0x45, 0x84, 0x75, 0x85, + 0xd0, 0x6c, 0x30, 0x4e, 0x62, 0x6c, 0x07, 0xde, 0x18, 0x07, 0x74, 0x6c, 0xf3, 0x35, 0xc5, 0x4c, + 0x71, 0xcd, 0xb3, 0x19, 0x99, 0x11, 0xf9, 0x68, 0x8b, 0x27, 0xb5, 0x6b, 0xd5, 0xa1, 0x76, 0x15, + 0xdd, 0x12, 0x07, 0x2f, 0x96, 0x98, 0x71, 0xeb, 0x5b, 0x01, 0x8e, 0xd4, 0x9a, 0x51, 0x12, 0x31, + 0x8c, 0x5e, 0x00, 0xc8, 0xc3, 0x5c, 0x86, 0x39, 0x33, 0xb4, 0x76, 0xa1, 0x53, 0xeb, 0x9d, 0x76, + 0x93, 0x57, 0xde, 0x7c, 0x10, 0xd2, 0x10, 0xf3, 0x7e, 0x71, 0xf3, 0xa3, 0x95, 0x73, 0xaa, 0x41, + 0xb2, 0x66, 0xe8, 0x02, 0xea, 0x97, 0x24, 0xa4, 0x24, 0xc2, 0x11, 0x1f, 0xad, 0x29, 0x36, 0xf2, + 0x6d, 0xad, 0x53, 0x75, 0xb2, 0x9b, 0xe8, 0x19, 0x94, 0x64, 0xc2, 0x46, 0xa1, 0xad, 0x75, 0x6a, + 0xbd, 0xc7, 0xdd, 0x83, 0x5a, 0xba, 0x43, 0xa1, 0xc8, 0x64, 0x14, 0x24, 0xe8, 0x78, 0x19, 0x60, + 0x66, 0x14, 0xef, 0xa1, 0x1d, 0xa1, 0x28, 0x5a, 0x42, 0xe8, 0x1d, 0x3c, 0x0a, 0x31, 0x8f, 0xfd, + 0x89, 0x1b, 0x62, 0xee, 0x4d, 0x3d, 0xee, 0x19, 0x25, 0x19, 0xd7, 0xca, 0xc4, 0x5d, 0x4b, 0xe6, + 0x3a, 0x41, 0xe4, 0x01, 0xc7, 0x61, 0x66, 0x0f, 0xf5, 0xa0, 0xc2, 0xbd, 0x78, 0x26, 0x0c, 0x28, + 0xcb, 0x13, 0x8c, 0xcc, 0x09, 0x23, 0xa5, 0xc9, 0xd0, 0x14, 0x44, 0x2f, 0xa1, 0x8a, 0x57, 0x38, + 0xa4, 0x81, 0x17, 0x33, 0xa3, 0x22, 0xa3, 0x9a, 0x99, 0xa8, 0x41, 0xaa, 0xca, 0xb8, 0xbf, 0x30, + 0xb2, 0xa1, 0xb4, 0x58, 0xe2, 0x78, 0x6d, 0xe8, 0x32, 0xaa, 0x91, 0x89, 0xfa, 0x24, 0x94, 0x37, + 0x1f, 0xaf, 0x54, 0xa1, 0x92, 0xb3, 0x7e, 0x6b, 0x50, 0xdd, 0x7b, 0x85, 0x1a, 0xa0, 0x87, 0x7e, + 0xe4, 0x72, 0x3f, 0xc4, 0x86, 0xd6, 0xd6, 0x3a, 0x05, 0xa7, 0x12, 0xfa, 0xd1, 0xc8, 0x0f, 0xb1, + 0x94, 0xbc, 0x95, 0x92, 0xf2, 0x89, 0xe4, 0xad, 0xa4, 0xf4, 0x14, 0x4e, 0xd9, 0x92, 0x52, 0x12, + 0x73, 0xe6, 0xb2, 0xb9, 0x17, 0x4f, 0xfd, 0x68, 0x26, 0x9b, 0xa2, 0x3b, 0x27, 0xa9, 0x30, 0x4c, + 0xf6, 0xd1, 0x00, 0x5a, 0x7b, 0xf8, 0x8b, 0xcf, 0xe7, 0x64, 0xc9, 0xdd, 0x18, 0xd3, 0xc0, 0x9f, + 0x78, 0xae, 0x9c, 0x00, 0x26, 0x9d, 0xd6, 0x9d, 0x27, 0x29, 0xf6, 0x59, 0x51, 0x8e, 0x82, 0xe4, + 0xd4, 0x30, 0xf4, 0x0a, 0x80, 0xb3, 0xe9, 0xd8, 0x15, 0x85, 0x09, 0x67, 0xc5, 0x68, 0x9d, 0x67, + 0x9d, 0x1d, 0xbe, 0xed, 0x8b, 0xa2, 0xd2, 0xf1, 0x12, 0xb8, 0x58, 0xb3, 0xf7, 0x45, 0xbd, 0x78, + 0x52, 0xb2, 0x6a, 0x50, 0xdd, 0xb7, 0xdd, 0x3a, 0x03, 0xf4, 0x6f, 0x2f, 0xc5, 0x7c, 0x1f, 0xf4, + 0xc7, 0x1a, 0x40, 0x3d, 0x63, 0xfc, 0xff, 0xd9, 0x65, 0x1d, 0xc3, 0xd1, 0x61, 0x27, 0xac, 0x05, + 0xe8, 0x69, 0xae, 0xc8, 0x86, 0x72, 0x62, 0x82, 0x26, 0x1b, 0xf8, 0xe0, 0x6d, 0x49, 0xb0, 0x4c, + 0x0a, 0xf9, 0x87, 0x53, 0x28, 0x64, 0x52, 0xe8, 0x5d, 0x42, 0x51, 0xbe, 0xee, 0x75, 0xf2, 0x9f, + 0x9d, 0xc9, 0x83, 0x3b, 0xdd, 0x6c, 0xdc, 0xa3, 0xa8, 0xdb, 0xdd, 0xbf, 0xd8, 0xfc, 0x32, 0x73, + 0x9b, 0xad, 0xa9, 0xdd, 0x6d, 0x4d, 0xed, 0xe7, 0xd6, 0xd4, 0xbe, 0xee, 0xcc, 0xdc, 0xdd, 0xce, + 0xcc, 0x7d, 0xdf, 0x99, 0xb9, 0x9b, 0xb2, 0xfa, 0xd6, 0x8c, 0xcb, 0xf2, 0x53, 0xf1, 0xfc, 0x4f, + 0x00, 0x00, 0x00, 0xff, 0xff, 0xf1, 0x5f, 0x0a, 0x2f, 0x81, 0x04, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -636,6 +681,20 @@ func (m *StoreInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.TsdbInfos) > 0 { + for iNdEx := len(m.TsdbInfos) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.TsdbInfos[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x32 + } + } if m.SupportsWithoutReplicaLabels { i-- if m.SupportsWithoutReplicaLabels { @@ -794,6 +853,49 @@ func (m *QueryAPIInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *TSDBInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TSDBInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TSDBInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.MaxTime != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.MaxTime)) + i-- + dAtA[i] = 0x18 + } + if m.MinTime != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.MinTime)) + i-- + dAtA[i] = 0x10 + } + { + size, err := m.Labels.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} + func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { offset -= sovRpc(v) base := offset @@ -875,6 +977,12 @@ func (m *StoreInfo) Size() (n int) { if m.SupportsWithoutReplicaLabels { n += 2 } + if len(m.TsdbInfos) > 0 { + for _, e := range m.TsdbInfos { + l = e.Size() + n += 1 + l + sovRpc(uint64(l)) + } + } return n } @@ -929,6 +1037,23 @@ func (m *QueryAPIInfo) Size() (n int) { return n } +func (m *TSDBInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.Labels.Size() + n += 1 + l + sovRpc(uint64(l)) + if m.MinTime != 0 { + n += 1 + sovRpc(uint64(m.MinTime)) + } + if m.MaxTime != 0 { + n += 1 + sovRpc(uint64(m.MaxTime)) + } + return n +} + func sovRpc(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -1424,6 +1549,40 @@ func (m *StoreInfo) Unmarshal(dAtA []byte) error { } } m.SupportsWithoutReplicaLabels = bool(v != 0) + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TsdbInfos", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TsdbInfos = append(m.TsdbInfos, TSDBInfo{}) + if err := m.TsdbInfos[len(m.TsdbInfos)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -1733,6 +1892,127 @@ func (m *QueryAPIInfo) Unmarshal(dAtA []byte) error { } return nil } +func (m *TSDBInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TSDBInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TSDBInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Labels.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MinTime", wireType) + } + m.MinTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MinTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxTime", wireType) + } + m.MaxTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxTime |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRpc(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/info/infopb/rpc.proto b/pkg/info/infopb/rpc.proto index 90cecd58905..9f0db3709da 100644 --- a/pkg/info/infopb/rpc.proto +++ b/pkg/info/infopb/rpc.proto @@ -61,6 +61,9 @@ message StoreInfo { // replica_aware means this store supports without_replica_labels of StoreAPI.Series. bool supports_without_replica_labels = 5; + + // TSDBInfos holds metadata for all TSDBs exposed by the store. + repeated TSDBInfo tsdb_infos = 6 [(gogoproto.nullable) = false]; } // RulesInfo holds the metadata related to Rules API exposed by the component. @@ -84,3 +87,10 @@ message ExemplarsInfo { // QueryInfo holds the metadata related to Query API exposed by the component. message QueryAPIInfo { } + +message TSDBInfo { + ZLabelSet labels = 1 [(gogoproto.nullable) = false]; + + int64 min_time = 2; + int64 max_time = 3; +} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 826d3e3cfb9..ad963edd3f5 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -527,9 +527,8 @@ func (e *EndpointSet) GetQueryAPIClients() []Client { queryClients := make([]Client, 0, len(endpoints)) for _, er := range endpoints { if er.HasQueryAPI() { - mint, maxt := er.timeRange() client := querypb.NewQueryClient(er.cc) - queryClients = append(queryClients, NewClient(client, er.addr, mint, maxt, er.labelSets())) + queryClients = append(queryClients, NewClient(client, er.addr, er.TSDBInfos())) } } return queryClients @@ -796,6 +795,18 @@ func (er *endpointRef) TimeRange() (mint, maxt int64) { return er.timeRange() } +func (er *endpointRef) TSDBInfos() []infopb.TSDBInfo { + er.mtx.RLock() + defer er.mtx.RUnlock() + + if er.metadata == nil || er.metadata.Store == nil { + return nil + } + + // Currently, min/max time of only StoreAPI is being updated by all components. + return er.metadata.Store.TsdbInfos +} + func (er *endpointRef) timeRange() (int64, int64) { if er.metadata == nil || er.metadata.Store == nil { return math.MinInt64, math.MaxInt64 diff --git a/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go index 0e78a4ccd9e..ddf413bd8dc 100644 --- a/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go +++ b/pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -197,6 +198,8 @@ func (s *storeRef) LabelSets() []labels.Labels { return s.labelSets } +func (s *storeRef) TSDBInfos() []infopb.TSDBInfo { return nil } + func (s *storeRef) TimeRange() (int64, int64) { s.mtx.RLock() defer s.mtx.RUnlock() diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index fc7c0b6ba34..86077734ba4 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -6,6 +6,7 @@ package query import ( "context" "io" + "math" "time" "github.com/go-kit/log" @@ -18,6 +19,7 @@ import ( "github.com/thanos-community/promql-engine/api" "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -34,23 +36,23 @@ type Opts struct { type Client struct { querypb.QueryClient address string - mint int64 - maxt int64 - labelSets []labels.Labels + tsdbInfos infopb.TSDBInfos } // NewClient creates a new Client. -func NewClient(queryClient querypb.QueryClient, address string, mint int64, maxt int64, labelSets []labels.Labels) Client { - return Client{QueryClient: queryClient, address: address, mint: mint, maxt: maxt, labelSets: labelSets} +func NewClient(queryClient querypb.QueryClient, address string, tsdbInfos infopb.TSDBInfos) Client { + return Client{ + QueryClient: queryClient, + address: address, + tsdbInfos: tsdbInfos, + } } -func (q Client) MinT() int64 { return q.mint } - -func (q Client) MaxT() int64 { return q.maxt } +func (c Client) GetAddress() string { return c.address } -func (q Client) LabelSets() []labels.Labels { return q.labelSets } - -func (q Client) GetAddress() string { return q.address } +func (c Client) LabelSets() []labels.Labels { + return c.tsdbInfos.LabelSets() +} type remoteEndpoints struct { logger log.Logger @@ -76,46 +78,93 @@ func (r remoteEndpoints) Engines() []api.RemoteEngine { } type remoteEngine struct { - Client opts Opts logger log.Logger + + client Client } func newRemoteEngine(logger log.Logger, queryClient Client, opts Opts) api.RemoteEngine { return &remoteEngine{ logger: logger, - Client: queryClient, + client: queryClient, opts: opts, } } +// MinT returns the minimum timestamp that is safe to query in the remote engine. +// In order to calculate it, we find the highest min time for each label set, and we return +// the lowest of those values. +// Calculating the MinT this way makes remote queries resilient to cases where one tsdb replica would delete +// a block due to retention before other replicas did the same. +// See https://github.com/thanos-community/promql-engine/issues/187. +func (r remoteEngine) MinT() int64 { + var ( + hashBuf = make([]byte, 0, 128) + highestMintByLabelSet = make(map[uint64]int64) + ) + for _, lset := range r.infosWithoutReplicaLabels() { + key, _ := labelpb.ZLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf) + lsetMinT, ok := highestMintByLabelSet[key] + if !ok { + highestMintByLabelSet[key] = lset.MinTime + continue + } + + if lset.MinTime > lsetMinT { + highestMintByLabelSet[key] = lset.MinTime + } + } + + var mint int64 = math.MaxInt64 + for _, m := range highestMintByLabelSet { + if m < mint { + mint = m + } + } + + return mint +} + +func (r remoteEngine) MaxT() int64 { + return r.client.tsdbInfos.MaxT() +} + func (r remoteEngine) LabelSets() []labels.Labels { + return r.infosWithoutReplicaLabels().LabelSets() +} + +func (r remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos { replicaLabelSet := make(map[string]struct{}) for _, lbl := range r.opts.ReplicaLabels { replicaLabelSet[lbl] = struct{}{} } // Strip replica labels from the result. - labelSets := r.Client.LabelSets() - result := make([]labels.Labels, 0, len(labelSets)) - for _, labelSet := range labelSets { - var builder labels.ScratchBuilder - for _, lbl := range labelSet { + infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos)) + var builder labels.ScratchBuilder + for _, info := range r.client.tsdbInfos { + builder.Reset() + for _, lbl := range info.Labels.Labels { if _, ok := replicaLabelSet[lbl.Name]; ok { continue } builder.Add(lbl.Name, lbl.Value) } - result = append(result, builder.Labels()) + infos = append(infos, infopb.NewTSDBInfo( + info.MinTime, + info.MaxTime, + labelpb.ZLabelsFromPromLabels(builder.Labels())), + ) } - return result + return infos } func (r remoteEngine) NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { return &remoteQuery{ logger: r.logger, - client: r.Client, + client: r.client, opts: r.opts, qs: qs, @@ -210,24 +259,16 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { return &promql.Result{Value: result} } -func (r *remoteQuery) Close() { - r.Cancel() -} +func (r *remoteQuery) Close() { r.Cancel() } -func (r *remoteQuery) Statement() parser.Statement { - return nil -} +func (r *remoteQuery) Statement() parser.Statement { return nil } -func (r *remoteQuery) Stats() *stats.Statistics { - return nil -} +func (r *remoteQuery) Stats() *stats.Statistics { return nil } + +func (r *remoteQuery) String() string { return r.qs } func (r *remoteQuery) Cancel() { if r.cancel != nil { r.cancel() } } - -func (r *remoteQuery) String() string { - return r.qs -} diff --git a/pkg/query/remote_engine_test.go b/pkg/query/remote_engine_test.go index ae9bb15040e..e9b75961e08 100644 --- a/pkg/query/remote_engine_test.go +++ b/pkg/query/remote_engine_test.go @@ -4,56 +4,163 @@ package query import ( + "math" "testing" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/thanos-io/thanos/pkg/store/labelpb" ) func TestRemoteEngine_LabelSets(t *testing.T) { tests := []struct { name string - labelSets []labels.Labels + tsdbInfos []infopb.TSDBInfo replicaLabels []string expected []labels.Labels }{ { name: "empty label sets", - labelSets: []labels.Labels{}, + tsdbInfos: []infopb.TSDBInfo{}, expected: []labels.Labels{}, }, { name: "empty label sets with replica labels", - labelSets: []labels.Labels{}, + tsdbInfos: []infopb.TSDBInfo{}, replicaLabels: []string{"replica"}, expected: []labels.Labels{}, }, { - name: "non-empty label sets", - labelSets: []labels.Labels{labels.FromStrings("a", "1")}, - expected: []labels.Labels{labels.FromStrings("a", "1")}, + name: "non-empty label sets", + tsdbInfos: []infopb.TSDBInfo{{ + Labels: zLabelSetFromStrings("a", "1"), + }}, + expected: []labels.Labels{labels.FromStrings("a", "1")}, }, { - name: "non-empty label sets with replica labels", - labelSets: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + name: "non-empty label sets with replica labels", + tsdbInfos: []infopb.TSDBInfo{{ + Labels: zLabelSetFromStrings("a", "1", "b", "2"), + }}, replicaLabels: []string{"a"}, expected: []labels.Labels{labels.FromStrings("b", "2")}, }, { - name: "replica labels not in label sets", - labelSets: []labels.Labels{labels.FromStrings("a", "1", "c", "2")}, + name: "replica labels not in label sets", + tsdbInfos: []infopb.TSDBInfo{ + { + Labels: zLabelSetFromStrings("a", "1", "c", "2"), + }, + }, replicaLabels: []string{"a", "b"}, expected: []labels.Labels{labels.FromStrings("c", "2")}, }, } - for _, test := range tests { - client := Client{labelSets: test.labelSets} - engine := newRemoteEngine(log.NewNopLogger(), client, Opts{ - ReplicaLabels: test.replicaLabels, + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + client := NewClient(nil, "", testCase.tsdbInfos) + engine := newRemoteEngine(log.NewNopLogger(), client, Opts{ + ReplicaLabels: testCase.replicaLabels, + }) + + testutil.Equals(t, testCase.expected, engine.LabelSets()) }) + } +} + +func TestRemoteEngine_MinT(t *testing.T) { + tests := []struct { + name string + tsdbInfos []infopb.TSDBInfo + replicaLabels []string + expected int64 + }{ + { + name: "empty label sets", + tsdbInfos: []infopb.TSDBInfo{}, + expected: math.MaxInt64, + }, + { + name: "empty label sets with replica labels", + tsdbInfos: []infopb.TSDBInfo{}, + replicaLabels: []string{"replica"}, + expected: math.MaxInt64, + }, + { + name: "non-empty label sets", + tsdbInfos: []infopb.TSDBInfo{{ + Labels: zLabelSetFromStrings("a", "1"), + MinTime: 30, + }}, + expected: 30, + }, + { + name: "non-empty label sets with replica labels", + tsdbInfos: []infopb.TSDBInfo{{ + Labels: zLabelSetFromStrings("a", "1", "b", "2"), + MinTime: 30, + }}, + replicaLabels: []string{"a"}, + expected: 30, + }, + { + name: "replicated labelsets with different mint", + tsdbInfos: []infopb.TSDBInfo{ + { + Labels: zLabelSetFromStrings("a", "1", "replica", "1"), + MinTime: 30, + }, + { + Labels: zLabelSetFromStrings("a", "1", "replica", "2"), + MinTime: 60, + }, + }, + replicaLabels: []string{"replica"}, + expected: 60, + }, + { + name: "multiple replicated labelsets with different mint", + tsdbInfos: []infopb.TSDBInfo{ + { + Labels: zLabelSetFromStrings("a", "1", "replica", "1"), + MinTime: 30, + }, + { + Labels: zLabelSetFromStrings("a", "1", "replica", "2"), + MinTime: 60, + }, + { + Labels: zLabelSetFromStrings("a", "2", "replica", "1"), + MinTime: 80, + }, + { + Labels: zLabelSetFromStrings("a", "2", "replica", "2"), + MinTime: 120, + }, + }, + replicaLabels: []string{"replica"}, + expected: 60, + }, + } + + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + client := NewClient(nil, "", testCase.tsdbInfos) + engine := newRemoteEngine(log.NewNopLogger(), client, Opts{ + ReplicaLabels: testCase.replicaLabels, + }) + + testutil.Equals(t, testCase.expected, engine.MinT()) + }) + } +} - testutil.Equals(t, test.expected, engine.LabelSets()) +func zLabelSetFromStrings(ss ...string) labelpb.ZLabelSet { + return labelpb.ZLabelSet{ + Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings(ss...)), } } diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 63357d85609..d4588da2698 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -24,6 +24,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/objstore" @@ -92,17 +93,23 @@ func NewMultiTSDB( type localClient struct { storepb.StoreClient - labelSetFunc func() []labelpb.ZLabelSet timeRangeFunc func() (int64, int64) + tsdbOpts *tsdb.Options } -func newLocalClient( +func NewLocalClient( c storepb.StoreClient, labelSetFunc func() []labelpb.ZLabelSet, timeRangeFunc func() (int64, int64), -) *localClient { - return &localClient{c, labelSetFunc, timeRangeFunc} + tsdbOpts *tsdb.Options, +) store.Client { + return &localClient{ + StoreClient: c, + labelSetFunc: labelSetFunc, + timeRangeFunc: timeRangeFunc, + tsdbOpts: tsdbOpts, + } } func (l *localClient) LabelSets() []labels.Labels { @@ -113,6 +120,22 @@ func (l *localClient) TimeRange() (mint int64, maxt int64) { return l.timeRangeFunc() } +func (l *localClient) TSDBInfos() []infopb.TSDBInfo { + labelsets := l.labelSetFunc() + if len(labelsets) == 0 { + return []infopb.TSDBInfo{} + } + + mint, maxt := l.timeRangeFunc() + return []infopb.TSDBInfo{ + { + Labels: labelsets[0], + MinTime: mint, + MaxTime: maxt, + }, + } +} + func (l *localClient) String() string { mint, maxt := l.timeRangeFunc() return fmt.Sprintf( @@ -159,7 +182,7 @@ func (t *tenant) store() *store.TSDBStore { return t.storeTSDB } -func (t *tenant) client(logger log.Logger) store.Client { +func (t *tenant) client(logger log.Logger, tsdbOpts *tsdb.Options) store.Client { t.mtx.RLock() defer t.mtx.RUnlock() @@ -167,8 +190,9 @@ func (t *tenant) client(logger log.Logger) store.Client { if tsdbStore == nil { return nil } + client := storepb.ServerAsClient(store.NewRecoverableStoreServer(logger, tsdbStore), 0) - return newLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange) + return NewLocalClient(client, tsdbStore.LabelSet, tsdbStore.TimeRange, tsdbOpts) } func (t *tenant) exemplars() *exemplars.TSDB { @@ -467,7 +491,7 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client { res := make([]store.Client, 0, len(t.tenants)) for _, tenant := range t.tenants { - client := tenant.client(t.logger) + client := tenant.client(t.logger, t.tsdbOpts) if client != nil { res = append(res, client) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 05e82607e3d..e6be9d0f003 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -53,6 +53,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" @@ -769,6 +770,25 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { return mint, maxt } +// TSDBInfos returns a list of infopb.TSDBInfos for blocks in the bucket store. +func (s *BucketStore) TSDBInfos() []infopb.TSDBInfo { + s.mtx.RLock() + defer s.mtx.RUnlock() + + infos := make([]infopb.TSDBInfo, 0, len(s.blocks)) + for _, b := range s.blocks { + infos = append(infos, infopb.TSDBInfo{ + Labels: labelpb.ZLabelSet{ + Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(b.meta.Thanos.Labels)), + }, + MinTime: b.meta.MinTime, + MaxTime: b.meta.MaxTime, + }) + } + + return infos +} + func (s *BucketStore) LabelSet() []labelpb.ZLabelSet { s.mtx.RLock() labelSets := s.advLabelSets diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e4444c718ff..20746fa0bcd 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -30,17 +30,19 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/httpconfig" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" "github.com/thanos-io/thanos/pkg/tracing" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // PrometheusStore implements the store node API on top of the Prometheus remote read API. @@ -755,6 +757,24 @@ func (p *PrometheusStore) LabelSet() []labelpb.ZLabelSet { return labelset } +func (p *PrometheusStore) TSDBInfos() []infopb.TSDBInfo { + labels := p.LabelSet() + if len(labels) == 0 { + return []infopb.TSDBInfo{} + } + + mint, maxt := p.Timestamps() + return []infopb.TSDBInfo{ + { + Labels: labelpb.ZLabelSet{ + Labels: labels[0].Labels, + }, + MinTime: mint, + MaxTime: maxt, + }, + } +} + func (p *PrometheusStore) Timestamps() (mint int64, maxt int64) { return p.timestamps() } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index d6056b524b6..0b5ac178e3f 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -32,6 +33,9 @@ import ( type ctxKey int +// UninitializedTSDBTime is the TSDB start time of an uninitialized TSDB instance. +const UninitializedTSDBTime = math.MaxInt64 + // StoreMatcherKey is the context key for the store's allow list. const StoreMatcherKey = ctxKey(0) @@ -50,6 +54,9 @@ type Client interface { // TimeRange returns minimum and maximum time range of data in the store. TimeRange() (mint int64, maxt int64) + // TSDBInfos returns metadata about each TSDB backed by the client. + TSDBInfos() []infopb.TSDBInfo + // SupportsSharding returns true if sharding is supported by the underlying store. SupportsSharding() bool @@ -234,6 +241,7 @@ func (s *ProxyStore) LabelSet() []labelpb.ZLabelSet { return labelSets } + func (s *ProxyStore) TimeRange() (int64, int64) { stores := s.stores() if len(stores) == 0 { @@ -254,6 +262,14 @@ func (s *ProxyStore) TimeRange() (int64, int64) { return minTime, maxTime } +func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo { + infos := make([]infopb.TSDBInfo, 0) + for _, store := range s.stores() { + infos = append(infos, store.TSDBInfos()...) + } + return infos +} + func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { // TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be // tiggered by tracing span to reduce cognitive load. diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 61d9bc1965d..aeade0a0f92 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/status" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index b6916005a38..90874842d69 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -5,6 +5,8 @@ package storetestutil import ( "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -18,10 +20,12 @@ type TestClient struct { Shardable bool WithoutReplicaLabelsEnabled bool IsLocalStore bool + StoreTSDBInfos []infopb.TSDBInfo } func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } func (c TestClient) TimeRange() (mint, maxt int64) { return c.MinTime, c.MaxTime } +func (c TestClient) TSDBInfos() []infopb.TSDBInfo { return c.StoreTSDBInfos } func (c TestClient) SupportsSharding() bool { return c.Shardable } func (c TestClient) SupportsWithoutReplicaLabels() bool { return c.WithoutReplicaLabelsEnabled } func (c TestClient) String() string { return c.Name } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 83d89725100..bea8f04ac9a 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -113,6 +114,24 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { return labelSets } +func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo { + labels := p.LabelSet() + if len(labels) == 0 { + return []infopb.TSDBInfo{} + } + + mint, maxt := p.TimeRange() + return []infopb.TSDBInfo{ + { + Labels: labelpb.ZLabelSet{ + Labels: labels[0].Labels, + }, + MinTime: mint, + MaxTime: maxt, + }, + } +} + func (s *TSDBStore) TimeRange() (int64, int64) { var minTime int64 = math.MinInt64 startTime, err := s.db.StartTime()