diff --git a/Makefile b/Makefile index bd95026346..bc561c6077 100644 --- a/Makefile +++ b/Makefile @@ -185,7 +185,7 @@ docs: $(EMBEDMD) build check-docs: ## checks docs against discrepancy with flags, links, white noise. check-docs: $(EMBEDMD) $(LICHE) build @EMBEDMD_BIN="$(EMBEDMD)" SED_BIN="$(SED)" scripts/genflagdocs.sh check - @$(LICHE) --recursive docs --exclude "(couchdb.apache.org/bylaws.html|cloud.tencent.com|alibabacloud.com)" --document-root . + @$(LICHE) --recursive docs --exclude "(couchdb.apache.org/bylaws.html|cloud.tencent.com|alibabacloud.com|zoom.us)" --document-root . @$(LICHE) --exclude "goreportcard.com" --document-root . *.md @find . -type f -name "*.md" | SED_BIN="$(SED)" xargs scripts/cleanup-white-noise.sh $(call require_clean_work_tree,"check documentation") diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7f716562ba..fcd8f6da29 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -284,6 +284,7 @@ func runStore( !disableIndexHeader, enablePostingsCompression, postingOffsetsInMemSampling, + false, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ba5f0548ef..fa3ff3593d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -21,6 +21,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/types" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -43,6 +44,7 @@ import ( "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tracing" @@ -257,6 +259,9 @@ type BucketStore struct { // When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller. enablePostingsCompression bool postingOffsetsInMemSampling int + + // Enables hints in the Series() response. + enableSeriesHints bool } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -278,6 +283,7 @@ func NewBucketStore( enableIndexHeader bool, enablePostingsCompression bool, postingOffsetsInMemSampling int, + enableSeriesHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility. ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -315,6 +321,7 @@ func NewBucketStore( enableIndexHeader: enableIndexHeader, enablePostingsCompression: enablePostingsCompression, postingOffsetsInMemSampling: postingOffsetsInMemSampling, + enableSeriesHints: enableSeriesHints, } s.metrics = metrics @@ -868,6 +875,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie res []storepb.SeriesSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) + hints = &hintspb.SeriesResponseHints{} ) s.mtx.RLock() @@ -891,6 +899,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie for _, b := range blocks { b := b + if s.enableSeriesHints { + // Keep track of queried blocks. + hints.AddQueriedBlock(b.meta.ULID) + } + // We must keep the readers open until all their data has been sent. indexr := b.indexReader(gctx) chunkr := b.chunkReader(gctx) @@ -1000,6 +1013,21 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie err = nil }) + + if s.enableSeriesHints { + var anyHints *types.Any + + if anyHints, err = types.MarshalAny(hints); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error()) + return + } + + if err = srv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error()) + return + } + } + return err } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 25ceda8ef2..12c3b45680 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -169,6 +169,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, true, DefaultPostingOffsetInMemorySampling, + true, ) testutil.Ok(t, err) s.store = store diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 0e5ccd7d64..1e8192ec1a 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -23,6 +23,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" + "github.com/gogo/protobuf/types" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" @@ -45,6 +46,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore/filesystem" "github.com/thanos-io/thanos/pkg/pool" storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -481,6 +483,7 @@ func TestBucketStore_Info(t *testing.T) { true, true, DefaultPostingOffsetInMemorySampling, + false, ) testutil.Ok(t, err) @@ -734,6 +737,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul true, true, DefaultPostingOffsetInMemorySampling, + false, ) testutil.Ok(t, err) @@ -1349,11 +1353,12 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expected: expected, + expectedSeries: expected, }) } fmt.Println("Starting") + benchmarkSeries(t, store, bCases) if !t.IsBenchmark() { // Make sure the pool is correctly used. This is expected for 200k numbers. @@ -1411,9 +1416,10 @@ type noopLimiter struct{} func (noopLimiter) Check(uint64) error { return nil } type benchSeriesCase struct { - name string - req *storepb.SeriesRequest - expected []storepb.Series + name string + req *storepb.SeriesRequest + expectedSeries []storepb.Series + expectedHints []hintspb.SeriesResponseHints } func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase) { @@ -1424,17 +1430,25 @@ func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, store.Series(c.req, srv)) testutil.Equals(t, 0, len(srv.Warnings)) - testutil.Equals(t, len(c.expected), len(srv.SeriesSet)) + testutil.Equals(t, len(c.expectedSeries), len(srv.SeriesSet)) if !t.IsBenchmark() { - if len(c.expected) == 1 { + if len(c.expectedSeries) == 1 { // Chunks are not sorted within response. TODO: Investigate: Is this fine? sort.Slice(srv.SeriesSet[0].Chunks, func(i, j int) bool { return srv.SeriesSet[0].Chunks[i].MinTime < srv.SeriesSet[0].Chunks[j].MinTime }) } // This might give unreadable output for millions of series if error. - testutil.Equals(t, c.expected, srv.SeriesSet) + testutil.Equals(t, c.expectedSeries, srv.SeriesSet) + + var actualHints []hintspb.SeriesResponseHints + for _, anyHints := range srv.HintsSet { + hints := hintspb.SeriesResponseHints{} + testutil.Ok(t, types.UnmarshalAny(anyHints, &hints)) + actualHints = append(actualHints, hints) + } + testutil.Equals(t, c.expectedHints, actualHints) } } @@ -1617,3 +1631,107 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Equals(t, numSeries, len(srv.SeriesSet)) }) } + +func TestSeries_HintsEnabled(t *testing.T) { + tb := testutil.NewTB(t) + + tmpDir, err := ioutil.TempDir("", "test-series-hints-enabled") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bktDir := filepath.Join(tmpDir, "bkt") + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + var ( + logger = log.NewNopLogger() + instrBkt = objstore.WithNoopInstr(bkt) + ) + + // Create TSDB blocks. + block1, seriesSet1 := createBlockWithOneSample(tb, bktDir, 0, 2) + block2, seriesSet2 := createBlockWithOneSample(tb, bktDir, 1, 2) + + // Inject the Thanos meta to each block in the storage. + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + + for _, blockID := range []ulid.ULID{block1, block2} { + _, err := metadata.InjectThanos(logger, filepath.Join(bktDir, blockID.String()), thanosMeta, nil) + testutil.Ok(t, err) + } + + // Instance a real bucket store we'll use to query back the series. + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil) + testutil.Ok(tb, err) + + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) + testutil.Ok(tb, err) + + store, err := NewBucketStore( + logger, + nil, + instrBkt, + fetcher, + tmpDir, + indexCache, + 1000000, + 10000, + 10, + false, + 10, + nil, + false, + true, + true, + DefaultPostingOffsetInMemorySampling, + true, + ) + testutil.Ok(tb, err) + testutil.Ok(tb, store.SyncBlocks(context.Background())) + + testCases := []*benchSeriesCase{ + { + name: "query a single block", + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 1, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expectedSeries: seriesSet1, + expectedHints: []hintspb.SeriesResponseHints{ + { + QueriedBlocks: []hintspb.Block{ + {Id: block1.String()}, + }, + }, + }, + }, { + name: "query multiple blocks", + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), + expectedHints: []hintspb.SeriesResponseHints{ + { + QueriedBlocks: []hintspb.Block{ + {Id: block1.String()}, + {Id: block2.String()}, + }, + }, + }, + }, + } + + benchmarkSeries(tb, store, testCases) +} diff --git a/pkg/store/hintspb/custom.go b/pkg/store/hintspb/custom.go new file mode 100644 index 0000000000..91c081a9fa --- /dev/null +++ b/pkg/store/hintspb/custom.go @@ -0,0 +1,12 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package hintspb + +import "github.com/oklog/ulid" + +func (m *SeriesResponseHints) AddQueriedBlock(id ulid.ULID) { + m.QueriedBlocks = append(m.QueriedBlocks, Block{ + Id: id.String(), + }) +} diff --git a/pkg/store/hintspb/hints.pb.go b/pkg/store/hintspb/hints.pb.go new file mode 100644 index 0000000000..42a4f4e16e --- /dev/null +++ b/pkg/store/hintspb/hints.pb.go @@ -0,0 +1,492 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: hints.proto + +package hintspb + +import ( + fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type SeriesResponseHints struct { + /// queried_blocks is the list of blocks that have been queried. + QueriedBlocks []Block `protobuf:"bytes,1,rep,name=queried_blocks,json=queriedBlocks,proto3" json:"queried_blocks"` +} + +func (m *SeriesResponseHints) Reset() { *m = SeriesResponseHints{} } +func (m *SeriesResponseHints) String() string { return proto.CompactTextString(m) } +func (*SeriesResponseHints) ProtoMessage() {} +func (*SeriesResponseHints) Descriptor() ([]byte, []int) { + return fileDescriptor_522be8e0d2634375, []int{0} +} +func (m *SeriesResponseHints) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeriesResponseHints) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeriesResponseHints.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeriesResponseHints) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeriesResponseHints.Merge(m, src) +} +func (m *SeriesResponseHints) XXX_Size() int { + return m.Size() +} +func (m *SeriesResponseHints) XXX_DiscardUnknown() { + xxx_messageInfo_SeriesResponseHints.DiscardUnknown(m) +} + +var xxx_messageInfo_SeriesResponseHints proto.InternalMessageInfo + +type Block struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (m *Block) Reset() { *m = Block{} } +func (m *Block) String() string { return proto.CompactTextString(m) } +func (*Block) ProtoMessage() {} +func (*Block) Descriptor() ([]byte, []int) { + return fileDescriptor_522be8e0d2634375, []int{1} +} +func (m *Block) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Block) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Block.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Block) XXX_Merge(src proto.Message) { + xxx_messageInfo_Block.Merge(m, src) +} +func (m *Block) XXX_Size() int { + return m.Size() +} +func (m *Block) XXX_DiscardUnknown() { + xxx_messageInfo_Block.DiscardUnknown(m) +} + +var xxx_messageInfo_Block proto.InternalMessageInfo + +func init() { + proto.RegisterType((*SeriesResponseHints)(nil), "hintspb.SeriesResponseHints") + proto.RegisterType((*Block)(nil), "hintspb.Block") +} + +func init() { proto.RegisterFile("hints.proto", fileDescriptor_522be8e0d2634375) } + +var fileDescriptor_522be8e0d2634375 = []byte{ + // 182 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0xc8, 0xcc, 0x2b, + 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2, + 0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x29, 0x88, 0x4b, 0x38, 0x38, 0xb5, + 0x28, 0x33, 0xb5, 0x38, 0x28, 0xb5, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0xd5, 0x03, 0xa4, 0x5c, 0xc8, + 0x9a, 0x8b, 0xaf, 0xb0, 0x14, 0x24, 0x9e, 0x12, 0x9f, 0x94, 0x93, 0x9f, 0x9c, 0x5d, 0x2c, 0xc1, + 0xa8, 0xc0, 0xac, 0xc1, 0x6d, 0xc4, 0xa7, 0x07, 0x35, 0x4e, 0xcf, 0x09, 0x24, 0xec, 0xc4, 0x72, + 0xe2, 0x9e, 0x3c, 0x43, 0x10, 0x2f, 0x54, 0x2d, 0x58, 0xac, 0x58, 0x49, 0x9c, 0x8b, 0x15, 0xcc, + 0x12, 0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xca, 0x4c, + 0x71, 0x52, 0x3d, 0xf1, 0x50, 0x8e, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, + 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, + 0xa2, 0x60, 0x2e, 0x4d, 0x62, 0x03, 0x3b, 0xcd, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x84, 0x7b, + 0x43, 0xa2, 0xc8, 0x00, 0x00, 0x00, +} + +func (m *SeriesResponseHints) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SeriesResponseHints) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesResponseHints) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.QueriedBlocks) > 0 { + for iNdEx := len(m.QueriedBlocks) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.QueriedBlocks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintHints(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *Block) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Block) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Block) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintHints(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintHints(dAtA []byte, offset int, v uint64) int { + offset -= sovHints(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *SeriesResponseHints) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.QueriedBlocks) > 0 { + for _, e := range m.QueriedBlocks { + l = e.Size() + n += 1 + l + sovHints(uint64(l)) + } + } + return n +} + +func (m *Block) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovHints(uint64(l)) + } + return n +} + +func sovHints(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozHints(x uint64) (n int) { + return sovHints(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *SeriesResponseHints) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeriesResponseHints: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeriesResponseHints: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field QueriedBlocks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHints + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHints + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.QueriedBlocks = append(m.QueriedBlocks, Block{}) + if err := m.QueriedBlocks[len(m.QueriedBlocks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHints + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthHints + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Block) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Block: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Block: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHints + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthHints + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthHints + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthHints + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipHints(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHints + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthHints + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupHints + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthHints + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthHints = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowHints = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupHints = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/store/hintspb/hints.proto b/pkg/store/hintspb/hints.proto new file mode 100644 index 0000000000..2bd4a6cc23 --- /dev/null +++ b/pkg/store/hintspb/hints.proto @@ -0,0 +1,28 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +syntax = "proto3"; +package hintspb; + +import "gogoproto/gogo.proto"; + +option go_package = "hintspb"; + +option (gogoproto.sizer_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// Do not generate XXX fields to reduce memory. +option (gogoproto.goproto_unkeyed_all) = false; +option (gogoproto.goproto_unrecognized_all) = false; +option (gogoproto.goproto_sizecache_all) = false; + +message SeriesResponseHints { + /// queried_blocks is the list of blocks that have been queried. + repeated Block queried_blocks = 1 [(gogoproto.nullable) = false]; +} + +message Block { + string id = 1; +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 32416d8b29..f4e8849904 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -14,6 +14,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -1363,6 +1364,7 @@ type storeSeriesServer struct { SeriesSet []storepb.Series Warnings []string + HintsSet []*types.Any Size int64 } @@ -1384,6 +1386,11 @@ func (s *storeSeriesServer) Send(r *storepb.SeriesResponse) error { return nil } + if r.GetHints() != nil { + s.HintsSet = append(s.HintsSet, r.GetHints()) + return nil + } + // Unsupported field, skip. return nil } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index ae5a099e6e..781c6d761f 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -7,6 +7,7 @@ import ( "strings" "unsafe" + "github.com/gogo/protobuf/types" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -35,6 +36,14 @@ func NewSeriesResponse(series *Series) *SeriesResponse { } } +func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { + return &SeriesResponse{ + Result: &SeriesResponse_Hints{ + Hints: hints, + }, + } +} + // CompareLabels compares two sets of labels. func CompareLabels(a, b []Label) int { l := len(a) diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 2cf23474d3..6ca11ecbe3 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -24,7 +24,7 @@ GO111MODULE=on go install "github.com/gogo/protobuf/protoc-gen-gogofast" GOGOPROTO_ROOT="$(GO111MODULE=on go list -f '{{ .Dir }}' -m github.com/gogo/protobuf)" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" -DIRS="pkg/store/storepb pkg/store/storepb/prompb" +DIRS="pkg/store/storepb pkg/store/storepb/prompb pkg/store/hintspb" echo "generating code" for dir in ${DIRS}; do