diff --git a/.mockery.yaml b/.mockery.yaml index 95d8c88f48..9e588ffff0 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -27,3 +27,7 @@ packages: github.com/grafana/pyroscope/pkg/experiment/metastore/dlq: interfaces: LocalServer: + github.com/grafana/pyroscope/pkg/experiment/metastore/index: + interfaces: + Store: + config: diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index 2689c567f7..fc6ee55a1f 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -4,9 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/grafana/dskit/flagext" - "github.com/grafana/pyroscope/pkg/experiment/metastore" - "github.com/grafana/pyroscope/pkg/test/mocks/mockdlq" "io" "math/rand" "path/filepath" @@ -16,7 +13,13 @@ import ( "testing" "time" + "github.com/grafana/dskit/flagext" + + "github.com/grafana/pyroscope/pkg/experiment/metastore" + "github.com/grafana/pyroscope/pkg/test/mocks/mockdlq" + gprofile "github.com/google/pprof/profile" + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect" @@ -414,9 +417,9 @@ func TestDLQRecoveryMock(t *testing.T) { func TestDLQRecovery(t *testing.T) { const tenant = "tb" - const ts = 239 + var ts = time.Now().UnixMilli() chunk := inputChunk([]input{ - {shard: 1, tenant: tenant, profile: cpuProfile(42, ts, "svc1", "kek", "foo", "bar")}, + {shard: 1, tenant: tenant, profile: cpuProfile(42, int(ts), "svc1", "kek", "foo", "bar")}, }) sw := newTestSegmentWriter(t, segmentWriterConfig{ diff --git a/pkg/experiment/metastore/index/index.go b/pkg/experiment/metastore/index/index.go new file mode 100644 index 0000000000..0db904586b --- /dev/null +++ b/pkg/experiment/metastore/index/index.go @@ -0,0 +1,519 @@ +package index + +import ( + "context" + "flag" + "fmt" + "slices" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/common/model" + "golang.org/x/sync/errgroup" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" +) + +type cacheKey struct { + partitionKey PartitionKey + tenant string +} + +type Index struct { + Config *Config + + partitionMu sync.Mutex + loadedPartitions map[cacheKey]*indexPartition + allPartitions []*PartitionMeta + + store Store + logger log.Logger +} + +type Config struct { + PartitionDuration time.Duration `yaml:"partition_duration"` + PartitionCacheSize int `yaml:"partition_cache_size"` + QueryLookaroundPeriod time.Duration `yaml:"query_lookaround_period"` +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.PartitionDuration, prefix+"partition-duration", DefaultConfig.PartitionDuration, "") + f.IntVar(&cfg.PartitionCacheSize, prefix+"partition-cache-size", DefaultConfig.PartitionCacheSize, "How many partitions to keep loaded in memory.") + f.DurationVar(&cfg.QueryLookaroundPeriod, prefix+"query-lookaround-period", DefaultConfig.QueryLookaroundPeriod, "") +} + +var DefaultConfig = Config{ + PartitionDuration: 24 * time.Hour, + PartitionCacheSize: 7, + QueryLookaroundPeriod: time.Hour, +} + +type indexPartition struct { + meta *PartitionMeta + accessedAt time.Time + shards map[uint32]*indexShard +} + +type indexShard struct { + blocks map[string]*metastorev1.BlockMeta +} + +type Store interface { + ListPartitions() []PartitionKey + ListShards(p PartitionKey) []uint32 + ListTenants(p PartitionKey, shard uint32) []string + ListBlocks(p PartitionKey, shard uint32, tenant string) []*metastorev1.BlockMeta +} + +// NewIndex initializes a new metastore index. +// +// The index provides access to block metadata. The data is partitioned by time, shard and tenant. Partition identifiers +// contain the time period referenced by partitions, e.g., "20240923T16.1h" refers to a partition for the 1-hour period +// between 2024-09-23T16:00:00.000Z and 2024-09-23T16:59:59.999Z. +// +// Partitions are mostly transparent for the end user, though PartitionMeta is at times used externally. Partition +// durations are configurable (at application level). +// +// The index requires a backing Store for loading data in memory. Data is loaded directly via LoadPartitions() or when +// looking up blocks with FindBlock() or FindBlocksInRange(). +func NewIndex(store Store, logger log.Logger, cfg *Config) *Index { + + // A fixed cache size gives us bounded memory footprint, however changes to the partition duration could reduce + // the cache effectiveness. + // TODO (aleks-p): + // - resize the cache at runtime when the config changes + // - consider auto-calculating the cache size to ensure we hold data for e.g., the last 24 hours + return &Index{ + loadedPartitions: make(map[cacheKey]*indexPartition, cfg.PartitionCacheSize), + allPartitions: make([]*PartitionMeta, 0), + store: store, + logger: logger, + Config: cfg, + } +} + +// LoadPartitions reads all partitions from the backing store and loads the recent ones in memory. +func (i *Index) LoadPartitions() { + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + + i.allPartitions = make([]*PartitionMeta, 0) + for _, key := range i.store.ListPartitions() { + pMeta := i.loadPartitionMeta(key) + level.Info(i.logger).Log( + "msg", "loaded metastore index partition", + "key", key, + "ts", pMeta.Ts.Format(time.RFC3339), + "duration", pMeta.Duration, + "tenants", strings.Join(pMeta.Tenants, ",")) + i.allPartitions = append(i.allPartitions, pMeta) + + // load the currently active partition + if pMeta.contains(time.Now().UTC().UnixMilli()) { + i.loadEntirePartition(pMeta) + } + } + level.Info(i.logger).Log("msg", "loaded metastore index partitions", "count", len(i.allPartitions)) + + i.sortPartitions() +} + +func (i *Index) loadPartitionMeta(key PartitionKey) *PartitionMeta { + t, dur, _ := key.Parse() + pMeta := &PartitionMeta{ + Key: key, + Ts: t, + Duration: dur, + Tenants: make([]string, 0), + tenantMap: make(map[string]struct{}), + } + for _, s := range i.store.ListShards(key) { + for _, t := range i.store.ListTenants(key, s) { + pMeta.AddTenant(t) + } + } + return pMeta +} + +// ForEachPartition executes the given function concurrently for each partition. It will be called for all partitions, +// regardless if they are fully loaded in memory or not. +func (i *Index) ForEachPartition(ctx context.Context, fn func(meta *PartitionMeta) error) error { + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + + g, ctx := errgroup.WithContext(ctx) + for _, meta := range i.allPartitions { + g.Go(func() error { + return fn(meta) + }) + } + err := g.Wait() + if err != nil { + level.Error(i.logger).Log("msg", "error during partition iteration", "err", err) + return err + } + return nil +} + +func (i *Index) loadEntirePartition(meta *PartitionMeta) { + for _, s := range i.store.ListShards(meta.Key) { + for _, t := range i.store.ListTenants(meta.Key, s) { + cKey := cacheKey{ + partitionKey: meta.Key, + tenant: t, + } + p, ok := i.loadedPartitions[cKey] + if !ok { + p = &indexPartition{ + meta: meta, + accessedAt: time.Now(), + shards: make(map[uint32]*indexShard), + } + i.loadedPartitions[cKey] = p + } + sh, ok := p.shards[s] + if !ok { + sh = &indexShard{ + blocks: make(map[string]*metastorev1.BlockMeta), + } + p.shards[s] = sh + } + for _, b := range i.store.ListBlocks(meta.Key, s, t) { + sh.blocks[b.Id] = b + } + } + } +} + +func (i *Index) getOrLoadPartition(meta *PartitionMeta, tenant string) *indexPartition { + cKey := cacheKey{ + partitionKey: meta.Key, + tenant: tenant, + } + p, ok := i.loadedPartitions[cKey] + if !ok { + p = &indexPartition{ + meta: meta, + shards: make(map[uint32]*indexShard), + } + for _, s := range i.store.ListShards(meta.Key) { + sh := &indexShard{ + blocks: make(map[string]*metastorev1.BlockMeta), + } + p.shards[s] = sh + for _, b := range i.store.ListBlocks(meta.Key, s, tenant) { + sh.blocks[b.Id] = b + } + } + i.loadedPartitions[cKey] = p + } + p.accessedAt = time.Now().UTC() + i.unloadPartitions() + return p +} + +// CreatePartitionKey creates a partition key for a block. It is meant to be used for newly inserted blocks, as it relies +// on the index's currently configured partition duration to create the key. +// +// Note: Using this for existing blocks following a partition duration change can produce the wrong key. Callers should +// verify that the returned partition actually contains the block. +func (i *Index) CreatePartitionKey(blockId string) PartitionKey { + t := ulid.Time(ulid.MustParse(blockId).Time()).UTC() + + var b strings.Builder + b.Grow(16) + + year, month, day := t.Date() + b.WriteString(fmt.Sprintf("%04d%02d%02d", year, month, day)) + + partitionDuration := i.Config.PartitionDuration + if partitionDuration < 24*time.Hour { + hour := (t.Hour() / int(partitionDuration.Hours())) * int(partitionDuration.Hours()) + b.WriteString(fmt.Sprintf("T%02d", hour)) + } + + mDuration := model.Duration(partitionDuration) + b.WriteString(".") + b.WriteString(mDuration.String()) + + return PartitionKey(b.String()) +} + +// findPartitionMeta retrieves the partition meta for the given key. +func (i *Index) findPartitionMeta(key PartitionKey) *PartitionMeta { + for _, p := range i.allPartitions { + if p.Key == key { + return p + } + } + return nil +} + +// InsertBlock is the primary way for adding blocks to the index. +func (i *Index) InsertBlock(b *metastorev1.BlockMeta) { + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + + i.insertBlock(b) +} + +// insertBlock is the underlying implementation for inserting blocks. It is the caller's responsibility to enforce safe +// concurrent access. The method will create a new partition if needed. +func (i *Index) insertBlock(b *metastorev1.BlockMeta) { + meta := i.getOrCreatePartitionMeta(b) + p := i.getOrLoadPartition(meta, b.TenantId) + + s, ok := p.shards[b.Shard] + if !ok { + s = &indexShard{ + blocks: make(map[string]*metastorev1.BlockMeta), + } + p.shards[b.Shard] = s + } + _, ok = s.blocks[b.Id] + if !ok { + s.blocks[b.Id] = b + } +} + +func (i *Index) getOrCreatePartitionMeta(b *metastorev1.BlockMeta) *PartitionMeta { + key := i.CreatePartitionKey(b.Id) + meta := i.findPartitionMeta(key) + + if meta == nil { + ts, duration, _ := key.Parse() + meta = &PartitionMeta{ + Key: key, + Ts: ts, + Duration: duration, + Tenants: make([]string, 0), + tenantMap: make(map[string]struct{}), + } + i.allPartitions = append(i.allPartitions, meta) + i.sortPartitions() + } + + if b.TenantId != "" { + meta.AddTenant(b.TenantId) + } else { + for _, ds := range b.Datasets { + meta.AddTenant(ds.TenantId) + } + } + + return meta +} + +// FindBlock tries to retrieve an existing block from the index. It will load the corresponding partition if it is not +// already loaded. Returns nil if the block cannot be found. +func (i *Index) FindBlock(shardNum uint32, tenant string, blockId string) *metastorev1.BlockMeta { + // first try the currently mapped partition + key := i.CreatePartitionKey(blockId) + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + + b := i.findBlockInPartition(key, shardNum, tenant, blockId) + if b != nil { + return b + } + + // try other partitions that could contain the block + t := ulid.Time(ulid.MustParse(blockId).Time()).UTC().UnixMilli() + for _, p := range i.allPartitions { + if p.contains(t) { + b := i.findBlockInPartition(p.Key, shardNum, tenant, blockId) + if b != nil { + return b + } + } + } + return nil +} + +func (i *Index) findBlockInPartition(key PartitionKey, shard uint32, tenant string, blockId string) *metastorev1.BlockMeta { + meta := i.findPartitionMeta(key) + if meta == nil { + return nil + } + + p := i.getOrLoadPartition(meta, tenant) + + s, _ := p.shards[shard] + if s == nil { + return nil + } + + b, _ := s.blocks[blockId] + + return b +} + +// FindBlocksInRange retrieves all blocks that might contain data for the given time range and tenants. +// +// It is not enough to scan for partition keys that fall in the given time interval. Partitions are built on top of +// block identifiers which refer to the moment a block was created and not to the timestamps of the profiles contained +// within the block (min_time, max_time). This method works around this by including blocks from adjacent partitions. +func (i *Index) FindBlocksInRange(start, end int64, tenants map[string]struct{}) ([]*metastorev1.BlockMeta, error) { + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + startWithLookaround := start - i.Config.QueryLookaroundPeriod.Milliseconds() + endWithLookaround := end + i.Config.QueryLookaroundPeriod.Milliseconds() + + blocks := make([]*metastorev1.BlockMeta, 0) + + for _, meta := range i.allPartitions { // TODO aleks-p: consider using binary search to find a good starting point + if meta.overlaps(startWithLookaround, endWithLookaround) { + for t := range tenants { + if !meta.HasTenant(t) { + continue + } + p := i.getOrLoadPartition(meta, t) + tenantBlocks := i.collectTenantBlocks(p, start, end) + blocks = append(blocks, tenantBlocks...) + + // return mixed blocks as well, we rely on the caller to filter out the data per tenant / service + p = i.getOrLoadPartition(meta, "") + tenantBlocks = i.collectTenantBlocks(p, start, end) + blocks = append(blocks, tenantBlocks...) + } + } + } + + return blocks, nil +} + +func (i *Index) sortPartitions() { + slices.SortFunc(i.allPartitions, func(a, b *PartitionMeta) int { + return a.compare(b) + }) +} + +func (i *Index) collectTenantBlocks(p *indexPartition, start, end int64) []*metastorev1.BlockMeta { + blocks := make([]*metastorev1.BlockMeta, 0) + for _, s := range p.shards { + for _, block := range s.blocks { + if start < block.MaxTime && end >= block.MinTime { + blocks = append(blocks, block) + } + } + } + return blocks +} + +// ReplaceBlocks removes source blocks from the index and inserts replacement blocks into the index. The intended usage +// is for block compaction. The replacement blocks could be added to the same or a different partition. +func (i *Index) ReplaceBlocks(sources []string, sourceShard uint32, sourceTenant string, replacements []*metastorev1.BlockMeta) { + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + + for _, newBlock := range replacements { + i.insertBlock(newBlock) + } + + for _, sourceBlock := range sources { + i.deleteBlock(sourceShard, sourceTenant, sourceBlock) + } +} + +// deleteBlock deletes a block from the index. It is the caller's responsibility to enforce safe concurrent access. +func (i *Index) deleteBlock(shard uint32, tenant string, blockId string) { + // first try the currently mapped partition + key := i.CreatePartitionKey(blockId) + if ok := i.tryDelete(key, shard, tenant, blockId); ok { + return + } + + // now try all other possible partitions + t := ulid.Time(ulid.MustParse(blockId).Time()).UTC().UnixMilli() + + for _, p := range i.allPartitions { + if p.contains(t) { + if ok := i.tryDelete(p.Key, shard, tenant, blockId); ok { + return + } + } + } +} + +func (i *Index) tryDelete(key PartitionKey, shard uint32, tenant string, blockId string) bool { + meta := i.findPartitionMeta(key) + if meta == nil { + return false + } + + cKey := cacheKey{ + partitionKey: key, + tenant: tenant, + } + p, ok := i.loadedPartitions[cKey] + if !ok { + return false + } + + s, ok := p.shards[shard] + if !ok { + return false + } + + if s.blocks[blockId] != nil { + delete(s.blocks, blockId) + return true + } + + return false +} + +func (i *Index) FindPartitionMetas(blockId string) []*PartitionMeta { + i.partitionMu.Lock() + defer i.partitionMu.Unlock() + ts := ulid.Time(ulid.MustParse(blockId).Time()).UTC().UnixMilli() + + metas := make([]*PartitionMeta, 0) + for _, p := range i.allPartitions { + if p.contains(ts) { + metas = append(metas, p) + } + } + return metas +} + +func (i *Index) unloadPartitions() { + tenantPartitions := make(map[string][]*indexPartition) + excessPerTenant := make(map[string]int) + for k, p := range i.loadedPartitions { + tenantPartitions[k.tenant] = append(tenantPartitions[k.tenant], p) + if len(tenantPartitions[k.tenant]) > i.Config.PartitionCacheSize { + excessPerTenant[k.tenant]++ + } + } + + for t, partitions := range tenantPartitions { + toRemove, ok := excessPerTenant[t] + if !ok { + continue + } + slices.SortFunc(partitions, func(a, b *indexPartition) int { + return a.accessedAt.Compare(b.accessedAt) + }) + level.Debug(i.logger).Log("msg", "unloading metastore index partitions", "tenant", t, "to_remove", len(partitions)) + for _, p := range partitions { + if p.meta.contains(time.Now().UTC().UnixMilli()) { + continue + } + level.Debug(i.logger).Log("unloading metastore index partition", "key", p.meta.Key, "accessed_at", p.accessedAt.Format(time.RFC3339)) + cKey := cacheKey{ + partitionKey: p.meta.Key, + tenant: t, + } + delete(i.loadedPartitions, cKey) + toRemove-- + if toRemove == 0 { + break + } + } + } +} diff --git a/pkg/experiment/metastore/index/index_test.go b/pkg/experiment/metastore/index/index_test.go new file mode 100644 index 0000000000..0682502b99 --- /dev/null +++ b/pkg/experiment/metastore/index/index_test.go @@ -0,0 +1,367 @@ +package index_test + +import ( + "context" + "crypto/rand" + "sync" + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" + "github.com/grafana/pyroscope/pkg/test/mocks/mockindex" + "github.com/grafana/pyroscope/pkg/util" +) + +func TestIndex_FindBlocksInRange(t *testing.T) { + tests := []struct { + name string + blocks []*metastorev1.BlockMeta + queryStart int64 + queryEnd int64 + want int + }{ + { + name: "matching blocks", + blocks: []*metastorev1.BlockMeta{ + createBlock("20240923T06.1h", 0), + createBlock("20240923T07.1h", 0), + createBlock("20240923T08.1h", 0), + createBlock("20240923T09.1h", 0), + createBlock("20240923T10.1h", 0), + }, + queryStart: createTime("2024-09-23T08:00:00.000Z"), + queryEnd: createTime("2024-09-23T09:00:00.000Z"), + want: 2, + }, + { + name: "no matching blocks", + blocks: []*metastorev1.BlockMeta{ + createBlock("20240923T06.1h", 0), + createBlock("20240923T07.1h", 0), + createBlock("20240923T08.1h", 0), + createBlock("20240923T09.1h", 0), + createBlock("20240923T10.1h", 0), + }, + queryStart: createTime("2024-09-23T04:00:00.000Z"), + queryEnd: createTime("2024-09-23T05:00:00.000Z"), + want: 0, + }, + { + name: "out of order ingestion (behind on time)", + blocks: []*metastorev1.BlockMeta{ + createBlock("20240923T06.1h", 0), + createBlock("20240923T07.1h", -1*time.Hour), // in range + createBlock("20240923T07.1h", -2*time.Hour), // in range + createBlock("20240923T07.1h", -3*time.Hour), // too old + createBlock("20240923T08.1h", -3*time.Hour), // // technically in range but we will not look here + createBlock("20240923T10.1h", 0), + }, + queryStart: createTime("2024-09-23T05:00:00.000Z"), + queryEnd: createTime("2024-09-23T06:00:00.000Z"), + want: 3, + }, + { + name: "out of order ingestion (ahead of time)", + blocks: []*metastorev1.BlockMeta{ + createBlock("20240923T06.1h", 2*time.Hour), // technically in range but we will not look here + createBlock("20240923T07.1h", 1*time.Hour), // in range + createBlock("20240923T07.1h", 3*time.Hour), // too new + createBlock("20240923T08.1h", 0), // in range + createBlock("20240923T08.1h", 1*time.Hour), // in range + createBlock("20240923T10.1h", 0), + }, + queryStart: createTime("2024-09-23T08:00:00.000Z"), + queryEnd: createTime("2024-09-23T09:00:00.000Z"), + want: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store := mockindex.NewMockStore(t) + store.On("ListShards", mock.Anything).Return([]uint32{}) + i := index.NewIndex(store, util.Logger, &index.Config{ + PartitionDuration: time.Hour, + PartitionCacheSize: 24, + QueryLookaroundPeriod: time.Hour, + }) + for _, b := range tt.blocks { + i.InsertBlock(b) + } + tenantMap := map[string]struct{}{"tenant-1": {}} + found, err := i.FindBlocksInRange(tt.queryStart, tt.queryEnd, tenantMap) + require.NoError(t, err) + require.Equal(t, tt.want, len(found)) + for _, b := range found { + require.Truef( + t, + tt.queryStart < b.MaxTime && tt.queryEnd >= b.MinTime, + "block %s is not in range, %v : %v", b.Id, time.UnixMilli(b.MinTime).UTC(), time.UnixMilli(b.MaxTime).UTC()) + } + }) + } + +} + +func mockPartition(store *mockindex.MockStore, key index.PartitionKey, blocks []*metastorev1.BlockMeta) { + store.On("ListShards", key).Return([]uint32{0}).Maybe() + store.On("ListTenants", key, uint32(0)).Return([]string{""}).Maybe() + store.On("ListBlocks", key, uint32(0), "").Return(blocks).Maybe() +} + +func TestIndex_ForEachPartition(t *testing.T) { + store := mockindex.NewMockStore(t) + i := index.NewIndex(store, util.Logger, &index.Config{PartitionDuration: time.Hour}) + + keys := []index.PartitionKey{ + "20240923T06.1h", + "20240923T07.1h", + "20240923T08.1h", + "20240923T09.1h", + "20240923T10.1h", + } + store.On("ListPartitions").Return(keys) + for _, key := range keys { + mockPartition(store, key, nil) + } + i.LoadPartitions() + + visited := make(map[index.PartitionKey]struct{}) + var mu sync.Mutex + err := i.ForEachPartition(context.Background(), func(meta *index.PartitionMeta) error { + mu.Lock() + visited[meta.Key] = struct{}{} + mu.Unlock() + return nil + }) + require.NoError(t, err) + + require.Len(t, visited, 5) +} + +func TestIndex_GetPartitionKey(t *testing.T) { + tests := []struct { + name string + duration time.Duration + blockId string + want index.PartitionKey + }{ + { + name: "1d", + duration: createDuration("1d"), + blockId: createUlidString("2024-07-15T16:13:43.245Z"), + want: index.PartitionKey("20240715.1d"), + }, + { + name: "1h at start of the window", + duration: createDuration("1h"), + blockId: createUlidString("2024-07-15T16:00:00.000Z"), + want: index.PartitionKey("20240715T16.1h"), + }, + { + name: "1h in the middle of the window", + duration: createDuration("1h"), + blockId: createUlidString("2024-07-15T16:13:43.245Z"), + want: index.PartitionKey("20240715T16.1h"), + }, + { + name: "1h at the end of the window", + duration: createDuration("1h"), + blockId: createUlidString("2024-07-15T16:59:59.999Z"), + want: index.PartitionKey("20240715T16.1h"), + }, + { + name: "6h duration at midnight", + duration: createDuration("6h"), + blockId: createUlidString("2024-07-15T00:00:00.000Z"), + want: index.PartitionKey("20240715T00.6h"), + }, + { + name: "6h at the middle of a window", + duration: createDuration("6h"), + blockId: createUlidString("2024-07-15T15:13:43.245Z"), + want: index.PartitionKey("20240715T12.6h"), + }, + { + name: "6h at the end of the window", + duration: createDuration("6h"), + blockId: createUlidString("2024-07-15T23:59:59.999Z"), + want: index.PartitionKey("20240715T18.6h"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + i := index.NewIndex(mockindex.NewMockStore(t), util.Logger, &index.Config{PartitionDuration: tt.duration}) + assert.Equalf(t, tt.want, i.CreatePartitionKey(tt.blockId), "CreatePartitionKey(%v)", tt.blockId) + }) + } +} + +func TestIndex_InsertBlock(t *testing.T) { + store := mockindex.NewMockStore(t) + store.On("ListShards", mock.Anything).Return([]uint32{}) + i := index.NewIndex(store, util.Logger, &index.Config{PartitionDuration: time.Hour, PartitionCacheSize: 1}) + block := &metastorev1.BlockMeta{ + Id: createUlidString("2024-09-23T08:00:00.123Z"), + TenantId: "tenant-1", + MinTime: createTime("2024-09-23T08:00:00.000Z"), + MaxTime: createTime("2024-09-23T08:05:00.000Z"), + } + + i.InsertBlock(block) + require.NotNil(t, i.FindBlock(0, "tenant-1", block.Id)) + blocks, err := i.FindBlocksInRange(createTime("2024-09-23T07:00:00.000Z"), createTime("2024-09-23T09:00:00.000Z"), map[string]struct{}{"tenant-1": {}}) + require.NoError(t, err) + require.Len(t, blocks, 1) + require.Equal(t, block, blocks[0]) + + // inserting the block again is a noop + i.InsertBlock(block) + blocks, err = i.FindBlocksInRange(createTime("2024-09-23T07:00:00.000Z"), createTime("2024-09-23T09:00:00.000Z"), map[string]struct{}{"tenant-1": {}}) + require.NoError(t, err) + require.Len(t, blocks, 1) + require.Equal(t, block, blocks[0]) +} + +func TestIndex_LoadPartitions(t *testing.T) { + store := mockindex.NewMockStore(t) + i := index.NewIndex(store, util.Logger, &index.Config{PartitionDuration: time.Hour, PartitionCacheSize: 1}) + + blocks := make([]*metastorev1.BlockMeta, 0, 420) + for i := 0; i < 420; i++ { + block := &metastorev1.BlockMeta{ + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), + Shard: 0, + } + blocks = append(blocks, block) + } + + partitionKey := i.CreatePartitionKey(blocks[0].Id) + store.On("ListPartitions").Return([]index.PartitionKey{partitionKey}) + store.On("ListShards", mock.Anything).Return([]uint32{0}) + store.On("ListTenants", mock.Anything, mock.Anything).Return([]string{""}) + store.On("ListBlocks", mock.Anything, mock.Anything, mock.Anything).Return(blocks) + + // restore from store + i.LoadPartitions() + + for _, b := range blocks { + require.NotNilf(t, i.FindBlock(b.Shard, b.TenantId, b.Id), "block %s not found", b.Id) + } +} + +func TestIndex_ReplaceBlocks(t *testing.T) { + store := mockindex.NewMockStore(t) + store.On("ListShards", mock.Anything).Return([]uint32{}) + i := index.NewIndex(store, util.Logger, &index.DefaultConfig) + b1 := &metastorev1.BlockMeta{ + Id: createUlidString("2024-09-23T08:00:00.123Z"), + } + i.InsertBlock(b1) + b2 := &metastorev1.BlockMeta{ + Id: createUlidString("2024-09-23T08:00:00.123Z"), + } + i.InsertBlock(b2) + + replacement := &metastorev1.BlockMeta{ + Id: createUlidString("2024-09-23T08:00:00.123Z"), + CompactionLevel: 1, + TenantId: "tenant-1", + } + i.ReplaceBlocks([]string{b1.Id, b2.Id}, 0, "", []*metastorev1.BlockMeta{replacement}) + + require.Nil(t, i.FindBlock(0, "", b1.Id)) + require.Nil(t, i.FindBlock(0, "", b2.Id)) + require.NotNil(t, i.FindBlock(0, "tenant-1", replacement.Id)) +} + +func TestIndex_DurationChange(t *testing.T) { + store := mockindex.NewMockStore(t) + store.On("ListShards", mock.Anything).Return([]uint32{}) + i := index.NewIndex(store, util.Logger, &index.Config{PartitionDuration: 24 * time.Hour, PartitionCacheSize: 1}) + b := &metastorev1.BlockMeta{ + Id: createUlidString("2024-09-23T08:00:00.123Z"), + } + i.InsertBlock(b) + require.NotNil(t, i.FindBlock(0, "", b.Id)) + + i.Config.PartitionDuration = time.Hour + require.NotNil(t, i.FindBlock(0, "", b.Id)) +} + +func TestIndex_UnloadPartitions(t *testing.T) { + store := mockindex.NewMockStore(t) + i := index.NewIndex(store, util.Logger, &index.Config{PartitionDuration: time.Hour, PartitionCacheSize: 3}) + + keys := []index.PartitionKey{ + "20240923T06.1h", + "20240923T07.1h", + "20240923T08.1h", + "20240923T09.1h", + "20240923T10.1h", + } + store.On("ListPartitions").Return(keys) + for _, key := range keys { + mockPartition(store, key, nil) + } + i.LoadPartitions() + require.True(t, store.AssertNumberOfCalls(t, "ListShards", 5)) + + for _, key := range keys { + start, _, _ := key.Parse() + for c := 0; c < 10; c++ { + _, err := i.FindBlocksInRange(start.UnixMilli(), start.Add(5*time.Minute).UnixMilli(), map[string]struct{}{"": {}}) + require.NoError(t, err) + } + } + // multiple reads cause a single store access + require.True(t, store.AssertNumberOfCalls(t, "ListShards", 10)) + + for c := 0; c < 10; c++ { + _, err := i.FindBlocksInRange(createTime("2024-09-23T08:00:00.000Z"), createTime("2024-09-23T08:05:00.000Z"), map[string]struct{}{"": {}}) + require.NoError(t, err) + } + // this partition is still loaded in memory + require.True(t, store.AssertNumberOfCalls(t, "ListShards", 10)) + + for c := 0; c < 10; c++ { + _, err := i.FindBlocksInRange(createTime("2024-09-23T06:00:00.000Z"), createTime("2024-09-23T06:05:00.000Z"), map[string]struct{}{"": {}}) + require.NoError(t, err) + } + // this partition was unloaded + require.True(t, store.AssertNumberOfCalls(t, "ListShards", 11)) +} + +func createUlidString(t string) string { + parsed, _ := time.Parse(time.RFC3339, t) + l := ulid.MustNew(ulid.Timestamp(parsed), rand.Reader) + return l.String() +} + +func createDuration(d string) time.Duration { + parsed, _ := model.ParseDuration(d) + return time.Duration(parsed) +} + +func createTime(t string) int64 { + ts, _ := time.Parse(time.RFC3339, t) + return ts.UnixMilli() +} + +func createBlock(key string, offset time.Duration) *metastorev1.BlockMeta { + pKey := index.PartitionKey(key) + ts, _, _ := pKey.Parse() + return &metastorev1.BlockMeta{ + Id: createUlidString(ts.Format(time.RFC3339)), + MinTime: ts.Add(offset).UnixMilli(), + MaxTime: ts.Add(offset).Add(5 * time.Minute).UnixMilli(), + TenantId: "tenant-1", + } +} diff --git a/pkg/experiment/metastore/index/partition_key.go b/pkg/experiment/metastore/index/partition_key.go new file mode 100644 index 0000000000..2b0c00b22c --- /dev/null +++ b/pkg/experiment/metastore/index/partition_key.go @@ -0,0 +1,37 @@ +package index + +import ( + "fmt" + "strings" + "time" + + "github.com/prometheus/common/model" +) + +const ( + dayLayout = "20060102" + hourLayout = "20060102T15" +) + +func getTimeLayout(d model.Duration) string { + if time.Duration(d) >= 24*time.Hour { + return dayLayout + } else { + return hourLayout + } +} + +type PartitionKey string + +func (k PartitionKey) Parse() (t time.Time, d time.Duration, err error) { + parts := strings.Split(string(k), ".") + if len(parts) != 2 { + return time.Time{}, 0, fmt.Errorf("invalid partition key: %s", k) + } + mDur, err := model.ParseDuration(parts[1]) + if err != nil { + return time.Time{}, 0, fmt.Errorf("invalid duration in partition key: %s", k) + } + t, err = time.Parse(getTimeLayout(mDur), parts[0]) + return t, time.Duration(mDur), err +} diff --git a/pkg/experiment/metastore/index/partition_meta.go b/pkg/experiment/metastore/index/partition_meta.go new file mode 100644 index 0000000000..08f6cbfb48 --- /dev/null +++ b/pkg/experiment/metastore/index/partition_meta.go @@ -0,0 +1,62 @@ +package index + +import ( + "time" +) + +type PartitionMeta struct { + Key PartitionKey + Ts time.Time + Duration time.Duration + Tenants []string + + tenantMap map[string]struct{} +} + +func (m *PartitionMeta) HasTenant(tenant string) bool { + m.loadTenants() + _, ok := m.tenantMap[tenant] + return ok +} + +func (m *PartitionMeta) StartTime() time.Time { + return m.Ts +} + +func (m *PartitionMeta) EndTime() time.Time { + return m.Ts.Add(m.Duration) +} + +func (m *PartitionMeta) loadTenants() { + if len(m.Tenants) > 0 && len(m.tenantMap) == 0 { + m.tenantMap = make(map[string]struct{}, len(m.Tenants)) + for _, t := range m.Tenants { + m.tenantMap[t] = struct{}{} + } + } +} + +func (m *PartitionMeta) AddTenant(tenant string) { + m.loadTenants() + if _, ok := m.tenantMap[tenant]; !ok { + m.tenantMap[tenant] = struct{}{} + m.Tenants = append(m.Tenants, tenant) + } +} + +func (m *PartitionMeta) compare(other *PartitionMeta) int { + if m == other { + return 0 + } + return m.Ts.Compare(other.Ts) +} + +// [ m.StartTime(), m.EndTime() ) +func (m *PartitionMeta) overlaps(start, end int64) bool { + return start < m.EndTime().UnixMilli() && end >= m.StartTime().UnixMilli() +} + +// [ m.StartTime(), m.EndTime() ) +func (m *PartitionMeta) contains(t int64) bool { + return t >= m.StartTime().UnixMilli() && t < m.EndTime().UnixMilli() +} diff --git a/pkg/experiment/metastore/index/partition_meta_test.go b/pkg/experiment/metastore/index/partition_meta_test.go new file mode 100644 index 0000000000..3177bf6a7e --- /dev/null +++ b/pkg/experiment/metastore/index/partition_meta_test.go @@ -0,0 +1,104 @@ +package index + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPartitionMeta_overlaps(t *testing.T) { + type args struct { + start time.Time + end time.Time + } + tests := []struct { + name string + meta PartitionMeta + args args + want bool + }{ + { + name: "simple overlap", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T07:15:24.123Z"), + end: createTime("2024-09-11T13:15:24.123Z"), + }, + want: true, + }, + { + name: "overlap at partition start", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T04:00:00.000Z"), + end: createTime("2024-09-11T06:00:00.000Z"), + }, + want: true, + }, + { + name: "no overlap close to partition start", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T04:00:00.000Z"), + end: createTime("2024-09-11T05:59:59.999Z"), + }, + want: false, + }, + { + name: "overlap at partition end", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T11:59:59.999Z"), + end: createTime("2024-09-11T13:00:00.000Z"), + }, + want: true, + }, + { + name: "no overlap close to partition end", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T12:00:00.000Z"), + end: createTime("2024-09-11T13:59:59.999Z"), + }, + want: false, + }, + { + name: "overlap around midnight", + meta: PartitionMeta{Ts: createTime("2024-09-11T00:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-10T19:00:00.000Z"), + end: createTime("2024-09-11T00:01:01.999Z"), + }, + want: true, + }, + { + name: "partition fully contains interval", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T07:00:00.000Z"), + end: createTime("2024-09-11T08:01:01.999Z"), + }, + want: true, + }, + { + name: "interval fully contains partition", + meta: PartitionMeta{Ts: createTime("2024-09-11T06:00:00.000Z"), Duration: 6 * time.Hour}, + args: args{ + start: createTime("2024-09-11T02:00:00.000Z"), + end: createTime("2024-09-11T13:01:01.999Z"), + }, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, tt.meta.overlaps(tt.args.start.UnixMilli(), tt.args.end.UnixMilli()), "overlaps(%v, %v)", tt.args.start, tt.args.end) + }) + } +} + +func createTime(t string) time.Time { + ts, _ := time.Parse(time.RFC3339, t) + return ts +} diff --git a/pkg/experiment/metastore/metastore.go b/pkg/experiment/metastore/metastore.go index 9a339e36f6..50f416c120 100644 --- a/pkg/experiment/metastore/metastore.go +++ b/pkg/experiment/metastore/metastore.go @@ -4,14 +4,11 @@ import ( "context" "flag" "fmt" - "github.com/grafana/pyroscope/pkg/experiment/metastore/dlq" - "github.com/thanos-io/objstore" "net" "os" "path/filepath" "strings" - "sync" "time" "github.com/go-kit/log" @@ -24,10 +21,13 @@ import ( raftwal "github.com/hashicorp/raft-wal" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client" + "github.com/grafana/pyroscope/pkg/experiment/metastore/dlq" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader" ) @@ -52,6 +52,7 @@ type Config struct { Compaction CompactionConfig `yaml:"compaction_config"` MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` DLQRecoveryPeriod time.Duration `yaml:"dlq_recovery_period" category:"advanced"` + Index index.Config `yaml:"index_config"` } type RaftConfig struct { @@ -76,6 +77,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DLQRecoveryPeriod, prefix+"dlq-recovery-period", 15*time.Second, "Period for DLQ recovery loop.") cfg.Raft.RegisterFlagsWithPrefix(prefix+"raft.", f) cfg.Compaction.RegisterFlagsWithPrefix(prefix+"compaction.", f) + cfg.Index.RegisterFlagsWithPrefix(prefix+"index.", f) } func (cfg *Config) Validate() error { @@ -133,8 +135,6 @@ type Metastore struct { walDir string - done chan struct{} - wg sync.WaitGroup metrics *metastoreMetrics client *metastoreclient.Client readySince time.Time @@ -153,12 +153,11 @@ func New(config Config, limits Limits, logger log.Logger, reg prometheus.Registe reg: reg, limits: limits, db: newDB(config, logger, metrics), - done: make(chan struct{}), metrics: metrics, client: client, } m.leaderhealth = raftleader.NewRaftLeaderHealthObserver(logger, raftleader.NewMetrics(reg)) - m.state = newMetastoreState(logger, m.db, m.reg, &config.Compaction) + m.state = newMetastoreState(logger, m.db, m.reg, &config.Compaction, &config.Index) m.dlq = dlq.NewRecovery(dlq.RecoveryConfig{ Period: config.DLQRecoveryPeriod, }, logger, m, bucket) @@ -175,7 +174,7 @@ func (m *Metastore) Shutdown() error { return nil } -func (m *Metastore) starting(context.Context) error { +func (m *Metastore) starting(ctx context.Context) error { if err := m.db.open(false); err != nil { return fmt.Errorf("failed to initialize database: %w", err) } @@ -183,14 +182,10 @@ func (m *Metastore) starting(context.Context) error { return fmt.Errorf("failed to initialize raft: %w", err) } m.dlq.Start() - m.wg.Add(1) - go m.cleanupLoop() return nil } func (m *Metastore) stopping(_ error) error { - close(m.done) - m.wg.Wait() return m.Shutdown() } diff --git a/pkg/experiment/metastore/metastore_block_store.go b/pkg/experiment/metastore/metastore_block_store.go new file mode 100644 index 0000000000..bfc75367b8 --- /dev/null +++ b/pkg/experiment/metastore/metastore_block_store.go @@ -0,0 +1,190 @@ +package metastore + +import ( + "encoding/binary" + "fmt" + "slices" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "go.etcd.io/bbolt" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" +) + +type indexStore struct { + db *boltdb + logger log.Logger +} + +func newIndexStore(db *boltdb, logger log.Logger) index.Store { + return &indexStore{ + db: db, + logger: logger, + } +} + +const ( + partitionBucketName = "partition" + emptyTenantBucketName = "-" +) + +var partitionBucketNameBytes = []byte(partitionBucketName) +var emptyTenantBucketNameBytes = []byte(emptyTenantBucketName) + +func getPartitionBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { + bkt := tx.Bucket(partitionBucketNameBytes) + if bkt == nil { + return nil, bbolt.ErrBucketNotFound + } + return bkt, nil +} + +func (m *indexStore) ListPartitions() []index.PartitionKey { + partitionKeys := make([]index.PartitionKey, 0) + err := m.db.boltdb.View(func(tx *bbolt.Tx) error { + bkt, err := getPartitionBucket(tx) + if err != nil { + return errors.Wrap(err, "root partition bucket missing") + } + err = bkt.ForEachBucket(func(name []byte) error { + partitionKeys = append(partitionKeys, index.PartitionKey(name)) + return nil + }) + return err + }) + if err != nil { + level.Error(m.logger).Log("msg", "error listing partitions", "err", err) + panic(err) + } + return partitionKeys +} + +func (m *indexStore) ListShards(key index.PartitionKey) []uint32 { + shards := make([]uint32, 0) + err := m.db.boltdb.View(func(tx *bbolt.Tx) error { + bkt, err := getPartitionBucket(tx) + if err != nil { + return errors.Wrap(err, "root partition bucket missing") + } + partBkt := bkt.Bucket([]byte(key)) + if partBkt == nil { + return nil + } + return partBkt.ForEachBucket(func(name []byte) error { + shards = append(shards, binary.BigEndian.Uint32(name)) + return nil + }) + }) + if err != nil { + level.Error(m.logger).Log("msg", "error listing shards", "partition", key, "err", err) + panic(err) + } + return shards +} + +func (m *indexStore) ListTenants(key index.PartitionKey, shard uint32) []string { + tenants := make([]string, 0) + err := m.db.boltdb.View(func(tx *bbolt.Tx) error { + bkt, err := getPartitionBucket(tx) + if err != nil { + return errors.Wrap(err, "root partition bucket missing") + } + partBkt := bkt.Bucket([]byte(key)) + if partBkt == nil { + return nil + } + shardBktName := make([]byte, 4) + binary.BigEndian.PutUint32(shardBktName, shard) + shardBkt := partBkt.Bucket(shardBktName) + if shardBkt == nil { + return nil + } + return shardBkt.ForEachBucket(func(name []byte) error { + if slices.Equal(name, emptyTenantBucketNameBytes) { + tenants = append(tenants, "") + } else { + tenants = append(tenants, string(name)) + } + return nil + }) + }) + if err != nil { + level.Error(m.logger).Log("msg", "error listing tenants", "partition", key, "shard", shard, "err", err) + panic(err) + } + return tenants +} + +func (m *indexStore) ListBlocks(key index.PartitionKey, shard uint32, tenant string) []*metastorev1.BlockMeta { + blocks := make([]*metastorev1.BlockMeta, 0) + err := m.db.boltdb.View(func(tx *bbolt.Tx) error { + bkt, err := getPartitionBucket(tx) + if err != nil { + return errors.Wrap(err, "root partition bucket missing") + } + partBkt := bkt.Bucket([]byte(key)) + if partBkt == nil { + return nil + } + shardBktName := make([]byte, 4) + binary.BigEndian.PutUint32(shardBktName, shard) + shardBkt := partBkt.Bucket(shardBktName) + if shardBkt == nil { + return nil + } + tenantBktName := []byte(tenant) + if len(tenantBktName) == 0 { + tenantBktName = emptyTenantBucketNameBytes + } + tenantBkt := shardBkt.Bucket(tenantBktName) + if tenantBkt == nil { + return nil + } + return tenantBkt.ForEach(func(k, v []byte) error { + var md metastorev1.BlockMeta + if err := md.UnmarshalVT(v); err != nil { + return fmt.Errorf("failed to unmarshal block %q: %w", string(k), err) + } + blocks = append(blocks, &md) + return nil + }) + }) + if err != nil { + level.Error(m.logger).Log("msg", "error listing blocks", "partition", key, "shard", shard, "tenant", tenant, "err", err) + panic(err) + } + return blocks +} + +func updateBlockMetadataBucket(tx *bbolt.Tx, partKey index.PartitionKey, shard uint32, tenant string, fn func(*bbolt.Bucket) error) error { + bkt, err := getPartitionBucket(tx) + if err != nil { + return errors.Wrap(err, "root partition bucket missing") + } + + partBkt, err := getOrCreateSubBucket(bkt, []byte(partKey)) + if err != nil { + return errors.Wrapf(err, "error creating partition bucket for %s", partKey) + } + + shardBktName := make([]byte, 4) + binary.BigEndian.PutUint32(shardBktName, shard) + shardBkt, err := getOrCreateSubBucket(partBkt, shardBktName) + if err != nil { + return errors.Wrapf(err, "error creating shard bucket for partiton %s and shard %d", partKey, shard) + } + + tenantBktName := []byte(tenant) + if len(tenantBktName) == 0 { + tenantBktName = emptyTenantBucketNameBytes + } + tenantBkt, err := getOrCreateSubBucket(shardBkt, tenantBktName) + if err != nil { + return errors.Wrapf(err, "error creating tenant bucket for partition %s, shard %d and tenant %s", partKey, shard, tenant) + } + + return fn(tenantBkt) +} diff --git a/pkg/experiment/metastore/metastore_boltdb.go b/pkg/experiment/metastore/metastore_boltdb.go index 4bd3d9c022..10190f04e1 100644 --- a/pkg/experiment/metastore/metastore_boltdb.go +++ b/pkg/experiment/metastore/metastore_boltdb.go @@ -71,7 +71,7 @@ func (db *boltdb) open(readOnly bool) (err error) { if !readOnly { err = db.boltdb.Update(func(tx *bbolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(blockMetadataBucketNameBytes) + _, err := tx.CreateBucketIfNotExists(partitionBucketNameBytes) if err != nil { return err } @@ -216,41 +216,12 @@ func getOrCreateSubBucket(parent *bbolt.Bucket, name []byte) (*bbolt.Bucket, err return bucket, nil } -const blockMetadataBucketName = "block_metadata" -const compactionJobBucketName = "compaction_job" +const ( + compactionJobBucketName = "compaction_job" +) -var blockMetadataBucketNameBytes = []byte(blockMetadataBucketName) var compactionJobBucketNameBytes = []byte(compactionJobBucketName) -func getBlockMetadataBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) { - mdb := tx.Bucket(blockMetadataBucketNameBytes) - if mdb == nil { - return nil, bbolt.ErrBucketNotFound - } - return mdb, nil -} - -func updateBlockMetadataBucket(tx *bbolt.Tx, name []byte, fn func(*bbolt.Bucket) error) error { - mdb, err := getBlockMetadataBucket(tx) - if err != nil { - return err - } - bucket, err := getOrCreateSubBucket(mdb, name) - if err != nil { - return err - } - return fn(bucket) -} - -// Bucket |Key -// [4:shard]|[block_id] -func keyForBlockMeta(shard uint32, tenant string, id string) (bucket, key []byte) { - k := make([]byte, 4+len(tenant)) - binary.BigEndian.PutUint32(k, shard) - copy(k[4:], tenant) - return k, []byte(id) -} - func parseBucketName(b []byte) (shard uint32, tenant string, ok bool) { if len(b) >= 4 { return binary.BigEndian.Uint32(b), string(b[4:]), true diff --git a/pkg/experiment/metastore/metastore_compaction_planner_test.go b/pkg/experiment/metastore/metastore_compaction_planner_test.go index 6c246034b6..4edf5c099a 100644 --- a/pkg/experiment/metastore/metastore_compaction_planner_test.go +++ b/pkg/experiment/metastore/metastore_compaction_planner_test.go @@ -1,29 +1,31 @@ package metastore import ( - "fmt" + "crypto/rand" "testing" "time" + "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" "github.com/grafana/pyroscope/pkg/util" ) func Test_MaintainSeparateBlockQueues(t *testing.T) { m := initState(t) _ = m.db.boltdb.Update(func(tx *bbolt.Tx) error { - _ = m.compactBlock(createBlock(1, 0, "", 0), tx, 0) - _ = m.compactBlock(createBlock(2, 0, "", 0), tx, 0) - _ = m.compactBlock(createBlock(3, 0, "", 0), tx, 0) - _ = m.compactBlock(createBlock(4, 1, "", 0), tx, 0) - _ = m.compactBlock(createBlock(5, 1, "", 0), tx, 0) - _ = m.compactBlock(createBlock(6, 1, "tenant1", 1), tx, 0) - _ = m.compactBlock(createBlock(7, 1, "tenant2", 1), tx, 0) - _ = m.compactBlock(createBlock(8, 1, "tenant1", 1), tx, 0) + _ = m.compactBlock(createBlock(0, "", 0), tx, 0) + _ = m.compactBlock(createBlock(0, "", 0), tx, 0) + _ = m.compactBlock(createBlock(0, "", 0), tx, 0) + _ = m.compactBlock(createBlock(1, "", 0), tx, 0) + _ = m.compactBlock(createBlock(1, "", 0), tx, 0) + _ = m.compactBlock(createBlock(1, "tenant1", 1), tx, 0) + _ = m.compactBlock(createBlock(1, "tenant2", 1), tx, 0) + _ = m.compactBlock(createBlock(1, "tenant1", 1), tx, 0) return nil }) require.Equal(t, 3, getQueueLen(m, 0, "", 0)) @@ -37,7 +39,7 @@ func Test_CreateJobs(t *testing.T) { m := initState(t) _ = m.db.boltdb.Update(func(tx *bbolt.Tx) error { for i := 0; i < 420; i++ { - _ = m.compactBlock(createBlock(i, i%4, "", 0), tx, 0) + _ = m.compactBlock(createBlock(i%4, "", 0), tx, 0) } return nil }) @@ -63,14 +65,14 @@ func initState(tb testing.TB) *metastoreState { err := db.open(false) require.NoError(tb, err) - m := newMetastoreState(util.Logger, db, reg, &config.Compaction) + m := newMetastoreState(util.Logger, db, reg, &config.Compaction, &index.DefaultConfig) require.NotNil(tb, m) return m } -func createBlock(id int, shard int, tenant string, level int) *metastorev1.BlockMeta { +func createBlock(shard int, tenant string, level int) *metastorev1.BlockMeta { return &metastorev1.BlockMeta{ - Id: fmt.Sprintf("b-%d", id), + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: uint32(shard), TenantId: tenant, CompactionLevel: uint32(level), @@ -85,7 +87,7 @@ func getQueueLen(m *metastoreState, shard int, tenant string, level int) int { } func verifyCompactionState(t *testing.T, m *metastoreState) { - stateFromDb := newMetastoreState(util.Logger, m.db, prometheus.DefaultRegisterer, m.compactionConfig) + stateFromDb := newMetastoreState(util.Logger, m.db, prometheus.DefaultRegisterer, m.compactionConfig, &index.DefaultConfig) err := m.db.boltdb.View(func(tx *bbolt.Tx) error { return stateFromDb.restoreCompactionPlan(tx) }) diff --git a/pkg/experiment/metastore/metastore_fsm.go b/pkg/experiment/metastore/metastore_fsm.go index 81d04e9219..15e706fe2f 100644 --- a/pkg/experiment/metastore/metastore_fsm.go +++ b/pkg/experiment/metastore/metastore_fsm.go @@ -26,7 +26,6 @@ import ( // when the request is converted to a Raft log entry. var commandTypeMap = map[reflect.Type]raftlogpb.CommandType{ reflect.TypeOf(new(metastorev1.AddBlockRequest)): raftlogpb.CommandType_COMMAND_TYPE_ADD_BLOCK, - reflect.TypeOf(new(raftlogpb.TruncateCommand)): raftlogpb.CommandType_COMMAND_TYPE_TRUNCATE, reflect.TypeOf(new(compactorv1.PollCompactionJobsRequest)): raftlogpb.CommandType_COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS, } @@ -36,9 +35,6 @@ var commandHandlers = map[raftlogpb.CommandType]commandHandler{ raftlogpb.CommandType_COMMAND_TYPE_ADD_BLOCK: func(fsm *FSM, cmd *raft.Log, raw []byte) fsmResponse { return handleCommand(raw, cmd, fsm.state.applyAddBlock) }, - raftlogpb.CommandType_COMMAND_TYPE_TRUNCATE: func(fsm *FSM, cmd *raft.Log, raw []byte) fsmResponse { - return handleCommand(raw, cmd, fsm.state.applyTruncate) - }, raftlogpb.CommandType_COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS: func(fsm *FSM, cmd *raft.Log, raw []byte) fsmResponse { return handleCommand(raw, cmd, fsm.state.applyPollCompactionJobs) }, diff --git a/pkg/experiment/metastore/metastore_hack.go b/pkg/experiment/metastore/metastore_hack.go deleted file mode 100644 index 05cf8beb11..0000000000 --- a/pkg/experiment/metastore/metastore_hack.go +++ /dev/null @@ -1,103 +0,0 @@ -package metastore - -import ( - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/hashicorp/raft" - "github.com/oklog/ulid" - "google.golang.org/protobuf/types/known/anypb" - - "github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb" -) - -// FIXME(kolesnikovae): -// Remove once compaction is implemented. -// Or use index instead of the timestamp. - -func (m *Metastore) cleanupLoop() { - t := time.NewTicker(10 * time.Minute) - defer func() { - t.Stop() - m.wg.Done() - }() - for { - select { - case <-m.done: - return - case <-t.C: - if m.raft.State() != raft.Leader { - continue - } - timestamp := uint64(time.Now().Add(-7 * 24 * time.Hour).UnixMilli()) - req := &raftlogpb.TruncateCommand{Timestamp: timestamp} - _, _, err := applyCommand[*raftlogpb.TruncateCommand, *anypb.Any](m.raft, req, m.config.Raft.ApplyTimeout) - if err != nil { - _ = level.Error(m.logger).Log("msg", "failed to apply truncate command", "err", err) - } - } - } -} - -func (m *metastoreState) applyTruncate(_ *raft.Log, request *raftlogpb.TruncateCommand) (*anypb.Any, error) { - m.shardsMutex.Lock() - var g sync.WaitGroup - g.Add(len(m.shards)) - for shardID, shard := range m.shards { - go truncateSegmentsBefore(m.db, m.logger, &g, shardID, shard, request.Timestamp) - } - m.shardsMutex.Unlock() - g.Wait() - return &anypb.Any{}, nil -} - -func truncateSegmentsBefore( - db *boltdb, - log log.Logger, - wg *sync.WaitGroup, - shardID uint32, - shard *metastoreShard, - t uint64, -) { - defer wg.Done() - var c int - tx, err := db.boltdb.Begin(true) - if err != nil { - _ = level.Error(log).Log("msg", "failed to start transaction", "err", err) - return - } - defer func() { - if err = tx.Commit(); err != nil { - _ = level.Error(log).Log("msg", "failed to commit transaction", "err", err) - return - } - _ = level.Info(log).Log("msg", "stale segments truncated", "segments", c) - }() - - bucket, err := getBlockMetadataBucket(tx) - if err != nil { - _ = level.Error(log).Log("msg", "failed to get metadata bucket", "err", err) - return - } - shardBucket, _ := keyForBlockMeta(shardID, "", "") - bucket = bucket.Bucket(shardBucket) - - shard.segmentsMutex.Lock() - defer shard.segmentsMutex.Unlock() - - for k, segment := range shard.segments { - if segment.CompactionLevel > 2 { - continue - } - if ulid.MustParse(segment.Id).Time() < t { - if err = bucket.Delete([]byte(segment.Id)); err != nil { - _ = level.Error(log).Log("msg", "failed to delete stale segments", "err", err) - return - } - delete(shard.segments, k) - c++ - } - } -} diff --git a/pkg/experiment/metastore/metastore_state.go b/pkg/experiment/metastore/metastore_state.go index 58e24529b0..20cd1e58a0 100644 --- a/pkg/experiment/metastore/metastore_state.go +++ b/pkg/experiment/metastore/metastore_state.go @@ -11,8 +11,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/bbolt" - metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" "github.com/grafana/pyroscope/pkg/experiment/metastore/compactionpb" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" ) const ( @@ -28,9 +28,9 @@ type metastoreState struct { logger log.Logger compactionMetrics *compactionMetrics compactionConfig *CompactionConfig + indexConfig *index.Config - shardsMutex sync.Mutex - shards map[uint32]*metastoreShard + index *index.Index compactionMutex sync.Mutex compactionJobBlockQueues map[tenantShard]*compactionJobBlockQueue @@ -39,83 +39,41 @@ type metastoreState struct { db *boltdb } -type metastoreShard struct { - segmentsMutex sync.Mutex - segments map[string]*metastorev1.BlockMeta -} - type compactionJobBlockQueue struct { mu sync.Mutex blocksByLevel map[uint32][]string } -func newMetastoreState(logger log.Logger, db *boltdb, reg prometheus.Registerer, compaction *CompactionConfig) *metastoreState { +func newMetastoreState(logger log.Logger, db *boltdb, reg prometheus.Registerer, compactionCfg *CompactionConfig, indexCfg *index.Config) *metastoreState { return &metastoreState{ logger: logger, - shards: make(map[uint32]*metastoreShard), + index: index.NewIndex(newIndexStore(db, logger), logger, indexCfg), db: db, compactionJobBlockQueues: make(map[tenantShard]*compactionJobBlockQueue), - compactionJobQueue: newJobQueue(compaction.JobLeaseDuration.Nanoseconds()), + compactionJobQueue: newJobQueue(compactionCfg.JobLeaseDuration.Nanoseconds()), compactionMetrics: newCompactionMetrics(reg), - compactionConfig: compaction, + compactionConfig: compactionCfg, + indexConfig: indexCfg, } } func (m *metastoreState) reset(db *boltdb) { - m.shardsMutex.Lock() m.compactionMutex.Lock() - clear(m.shards) clear(m.compactionJobBlockQueues) + m.index = index.NewIndex(newIndexStore(db, m.logger), m.logger, m.indexConfig) m.compactionJobQueue = newJobQueue(m.compactionConfig.JobLeaseDuration.Nanoseconds()) m.db = db - m.shardsMutex.Unlock() m.compactionMutex.Unlock() } -func (m *metastoreState) getOrCreateShard(shardID uint32) *metastoreShard { - m.shardsMutex.Lock() - defer m.shardsMutex.Unlock() - if shard, ok := m.shards[shardID]; ok { - return shard - } - shard := newMetastoreShard() - m.shards[shardID] = shard - return shard -} - func (m *metastoreState) restore(db *boltdb) error { m.reset(db) + m.index.LoadPartitions() return db.boltdb.View(func(tx *bbolt.Tx) error { - if err := m.restoreBlockMetadata(tx); err != nil { - return fmt.Errorf("failed to restore metadata entries: %w", err) - } return m.restoreCompactionPlan(tx) }) } -func (m *metastoreState) restoreBlockMetadata(tx *bbolt.Tx) error { - mdb, err := getBlockMetadataBucket(tx) - switch { - case err == nil: - case errors.Is(err, bbolt.ErrBucketNotFound): - return nil - default: - return err - } - // List shards in the block_metadata bucket: - // block_metadata/[{shard_id}]/[block_id] - // TODO(kolesnikovae): Load concurrently. - return mdb.ForEachBucket(func(name []byte) error { - shardID, _, ok := parseBucketName(name) - if !ok { - _ = level.Error(m.logger).Log("msg", "malformed bucket name", "name", string(name)) - return nil - } - shard := m.getOrCreateShard(shardID) - return shard.loadSegments(mdb.Bucket(name)) - }) -} - func (m *metastoreState) restoreCompactionPlan(tx *bbolt.Tx) error { cdb, err := getCompactionJobBucket(tx) switch { @@ -165,38 +123,6 @@ func (m *metastoreState) findJob(name string) *compactionpb.CompactionJob { return nil } -func newMetastoreShard() *metastoreShard { - return &metastoreShard{ - segments: make(map[string]*metastorev1.BlockMeta), - } -} - -func (s *metastoreShard) putSegment(segment *metastorev1.BlockMeta) { - s.segmentsMutex.Lock() - s.segments[segment.Id] = segment - s.segmentsMutex.Unlock() -} - -func (s *metastoreShard) deleteSegment(segmentId string) { - s.segmentsMutex.Lock() - delete(s.segments, segmentId) - s.segmentsMutex.Unlock() -} - -func (s *metastoreShard) loadSegments(b *bbolt.Bucket) error { - s.segmentsMutex.Lock() - defer s.segmentsMutex.Unlock() - c := b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - var md metastorev1.BlockMeta - if err := md.UnmarshalVT(v); err != nil { - return fmt.Errorf("failed to block %q: %w", string(k), err) - } - s.segments[md.Id] = &md - } - return nil -} - func (m *metastoreState) loadCompactionPlan(b *bbolt.Bucket, blockQueue *compactionJobBlockQueue) error { blockQueue.mu.Lock() defer blockQueue.mu.Unlock() diff --git a/pkg/experiment/metastore/metastore_state_add_block.go b/pkg/experiment/metastore/metastore_state_add_block.go index 13ae375047..961889e942 100644 --- a/pkg/experiment/metastore/metastore_state_add_block.go +++ b/pkg/experiment/metastore/metastore_state_add_block.go @@ -2,12 +2,12 @@ package metastore import ( "context" - "github.com/go-kit/log" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/hashicorp/raft" + "github.com/oklog/ulid" "go.etcd.io/bbolt" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" @@ -15,6 +15,11 @@ import ( func (m *Metastore) AddBlock(_ context.Context, req *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { l := log.With(m.logger, "shard", req.Block.Shard, "block_id", req.Block.Id, "ts", req.Block.MinTime) + _, err := ulid.Parse(req.Block.Id) + if err != nil { + _ = level.Warn(l).Log("failed to parse block id", "err", err) + return nil, err + } _ = level.Info(l).Log("msg", "adding block") t1 := time.Now() defer func() { @@ -42,16 +47,8 @@ func (m *Metastore) AddRecoveredBlock(_ context.Context, req *metastorev1.AddBlo } func (m *metastoreState) applyAddBlock(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { - name, key := keyForBlockMeta(request.Block.Shard, "", request.Block.Id) - value, err := request.Block.MarshalVT() - if err != nil { - return nil, err - } - - err = m.db.boltdb.Update(func(tx *bbolt.Tx) error { - err := updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error { - return bucket.Put(key, value) - }) + err := m.db.boltdb.Update(func(tx *bbolt.Tx) error { + err := m.persistBlock(tx, request.Block) if err != nil { return err } @@ -68,6 +65,20 @@ func (m *metastoreState) applyAddBlock(log *raft.Log, request *metastorev1.AddBl ) return nil, err } - m.getOrCreateShard(request.Block.Shard).putSegment(request.Block) + m.index.InsertBlock(request.Block) return &metastorev1.AddBlockResponse{}, nil } + +func (m *metastoreState) persistBlock(tx *bbolt.Tx, block *metastorev1.BlockMeta) error { + key := []byte(block.Id) + value, err := block.MarshalVT() + if err != nil { + return err + } + + partKey := m.index.CreatePartitionKey(block.Id) + + return updateBlockMetadataBucket(tx, partKey, block.Shard, block.TenantId, func(bucket *bbolt.Bucket) error { + return bucket.Put(key, value) + }) +} diff --git a/pkg/experiment/metastore/metastore_state_get_profile_stats.go b/pkg/experiment/metastore/metastore_state_get_profile_stats.go index d9c393a6ab..84e29f9311 100644 --- a/pkg/experiment/metastore/metastore_state_get_profile_stats.go +++ b/pkg/experiment/metastore/metastore_state_get_profile_stats.go @@ -5,10 +5,9 @@ import ( "math" "sync" - "golang.org/x/sync/errgroup" - metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" ) func (m *Metastore) GetProfileStats( @@ -26,52 +25,23 @@ func (m *metastoreState) getProfileStats(tenant string, ctx context.Context) (*t OldestProfileTime: math.MaxInt64, NewestProfileTime: math.MinInt64, } - m.shardsMutex.Lock() - defer m.shardsMutex.Unlock() - g, ctx := errgroup.WithContext(ctx) - for _, s := range m.shards { - s := s - g.Go(func() error { - oldest := int64(math.MaxInt64) - newest := int64(math.MinInt64) - ingested := len(s.segments) > 0 - for _, b := range s.segments { - if b.TenantId != "" && b.TenantId != tenant { - continue - } - hasTenant := b.TenantId == tenant - if !hasTenant { - for _, d := range b.Datasets { - if d.TenantId == tenant { - hasTenant = true - break - } - } - } - if !hasTenant { - continue - } - if b.MinTime < oldest { - oldest = b.MinTime - } - if b.MaxTime > newest { - newest = b.MaxTime - } - } - respMutex.Lock() - defer respMutex.Unlock() - resp.DataIngested = resp.DataIngested || ingested - if oldest < resp.OldestProfileTime { - resp.OldestProfileTime = oldest - } - if newest > resp.NewestProfileTime { - resp.NewestProfileTime = newest - } + err := m.index.ForEachPartition(ctx, func(p *index.PartitionMeta) error { + if !p.HasTenant(tenant) { return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } - return &resp, nil + } + oldest := p.StartTime().UnixMilli() + newest := p.EndTime().UnixMilli() + respMutex.Lock() + defer respMutex.Unlock() + resp.DataIngested = true + if oldest < resp.OldestProfileTime { + resp.OldestProfileTime = oldest + } + if newest > resp.NewestProfileTime { + resp.NewestProfileTime = newest + } + return nil + }) + + return &resp, err } diff --git a/pkg/experiment/metastore/metastore_state_get_profile_stats_test.go b/pkg/experiment/metastore/metastore_state_get_profile_stats_test.go index 003cf8d6af..9d31846731 100644 --- a/pkg/experiment/metastore/metastore_state_get_profile_stats_test.go +++ b/pkg/experiment/metastore/metastore_state_get_profile_stats_test.go @@ -2,11 +2,12 @@ package metastore import ( "context" - "fmt" + "crypto/rand" "math" "testing" "github.com/hashicorp/raft" + "github.com/oklog/ulid" "github.com/stretchr/testify/require" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" @@ -27,21 +28,21 @@ func Test_MetastoreState_GetProfileStats_NoData(t *testing.T) { func Test_MetastoreState_GetProfileStats_MultipleShards(t *testing.T) { m := initState(t) _, _ = m.applyAddBlock(&raft.Log{}, &metastorev1.AddBlockRequest{Block: &metastorev1.BlockMeta{ - Id: "1-1", + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: 1, TenantId: "tenant1", MinTime: 20, MaxTime: 50, }}) _, _ = m.applyAddBlock(&raft.Log{}, &metastorev1.AddBlockRequest{Block: &metastorev1.BlockMeta{ - Id: "1-2", + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: 1, TenantId: "tenant2", MinTime: 30, MaxTime: 60, }}) _, _ = m.applyAddBlock(&raft.Log{}, &metastorev1.AddBlockRequest{Block: &metastorev1.BlockMeta{ - Id: "2-1", + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: 2, TenantId: "tenant1", MinTime: 10, @@ -50,11 +51,9 @@ func Test_MetastoreState_GetProfileStats_MultipleShards(t *testing.T) { stats, err := m.getProfileStats("tenant1", context.Background()) require.NoError(t, err) - require.Equal(t, &typesv1.GetProfileStatsResponse{ - DataIngested: true, - OldestProfileTime: 10, - NewestProfileTime: 50, - }, stats) + require.True(t, stats.DataIngested) + require.True(t, stats.OldestProfileTime > math.MinInt64) + require.True(t, stats.NewestProfileTime < math.MaxInt64) } func Benchmark_MetastoreState_GetProfileStats(b *testing.B) { @@ -68,8 +67,8 @@ func Benchmark_MetastoreState_GetProfileStats(b *testing.B) { // total: 123 blocks // monthly: 3690 blocks (usually less) for i := 0; i < 600; i++ { // level 0 - m.getOrCreateShard(uint32(s)).putSegment(&metastorev1.BlockMeta{ - Id: fmt.Sprintf("b-%d", i), + m.index.InsertBlock(&metastorev1.BlockMeta{ + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: uint32(s), MinTime: int64(i * 10), MaxTime: int64(i * 40), @@ -84,8 +83,8 @@ func Benchmark_MetastoreState_GetProfileStats(b *testing.B) { }) } for i := 0; i < 3400; i++ { - m.getOrCreateShard(uint32(s)).putSegment(&metastorev1.BlockMeta{ - Id: fmt.Sprintf("b-%d", i), + m.index.InsertBlock(&metastorev1.BlockMeta{ + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: uint32(s), TenantId: "tenant1", MinTime: int64(i * 10), diff --git a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go index 17f1034ed5..8b27f2de91 100644 --- a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go +++ b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go @@ -40,8 +40,8 @@ func (m *metastoreState) applyPollCompactionJobs(raft *raft.Log, request *compac } type pollStateUpdate struct { - newBlocks map[uint32][]string - deletedBlocks map[uint32][]string + newBlocks map[tenantShard][]*metastorev1.BlockMeta + deletedBlocks map[tenantShard][]string newJobs []string updatedBlockQueues map[tenantShard][]uint32 deletedJobs map[tenantShard][]string @@ -50,8 +50,8 @@ type pollStateUpdate struct { func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJobsRequest, raftIndex uint64, raftAppendedAtNanos int64) (resp *compactorv1.PollCompactionJobsResponse, err error) { stateUpdate := &pollStateUpdate{ - newBlocks: make(map[uint32][]string), - deletedBlocks: make(map[uint32][]string), + newBlocks: make(map[tenantShard][]*metastorev1.BlockMeta), + deletedBlocks: make(map[tenantShard][]string), newJobs: make([]string, 0), updatedBlockQueues: make(map[tenantShard][]uint32), deletedJobs: make(map[tenantShard][]string), @@ -79,12 +79,17 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ fmt.Sprint(job.Shard), job.TenantId, fmt.Sprint(job.CompactionLevel)).Inc() // next we'll replace source blocks with compacted ones - // we need to acquire the shards lock first, to protect the read path from an inconsistent view of the data - m.shardsMutex.Lock() + m.index.ReplaceBlocks(job.Blocks, job.Shard, job.TenantId, jobUpdate.CompletedJob.Blocks) for _, b := range jobUpdate.CompletedJob.Blocks { - level.Debug(m.logger).Log("msg", "adding compacted block", "block", b.Id, "level", b.CompactionLevel, "source_job", job.Name) - m.shards[job.Shard].putSegment(b) - stateUpdate.newBlocks[job.Shard] = append(stateUpdate.newBlocks[job.Shard], b.Id) + level.Debug(m.logger).Log( + "msg", "added compacted block", + "block", b.Id, + "shard", b.Shard, + "tenant", b.TenantId, + "level", b.CompactionLevel, + "source_job", job.Name) + blockTenantShard := tenantShard{tenant: b.TenantId, shard: b.Shard} + stateUpdate.newBlocks[blockTenantShard] = append(stateUpdate.newBlocks[blockTenantShard], b) // adding new blocks to the compaction queue if jobForNewBlock := m.tryCreateJob(b, jobUpdate.RaftLogIndex); jobForNewBlock != nil { @@ -97,24 +102,22 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ } m.compactionMetrics.addedBlocks.WithLabelValues( fmt.Sprint(job.Shard), job.TenantId, fmt.Sprint(job.CompactionLevel)).Inc() - blockTenantShard := tenantShard{tenant: b.TenantId, shard: b.Shard} + stateUpdate.updatedBlockQueues[blockTenantShard] = append(stateUpdate.updatedBlockQueues[blockTenantShard], b.CompactionLevel) } - // finally we'll delete the metadata for source blocks (this doesn't delete blocks from object store) for _, b := range job.Blocks { level.Debug(m.logger).Log( - "msg", "deleting source block", + "msg", "deleted source block", "block", b, - "tenant", job.TenantId, "shard", job.Shard, + "tenant", job.TenantId, "level", job.CompactionLevel, + "job", job.Name, ) - m.shards[job.Shard].deleteSegment(b) - stateUpdate.deletedBlocks[job.Shard] = append(stateUpdate.deletedBlocks[job.Shard], b) m.compactionMetrics.deletedBlocks.WithLabelValues( fmt.Sprint(job.Shard), job.TenantId, fmt.Sprint(job.CompactionLevel)).Inc() + stateUpdate.deletedBlocks[jobKey] = append(stateUpdate.deletedBlocks[jobKey], b) } - m.shardsMutex.Unlock() case compactorv1.CompactionStatus_COMPACTION_STATUS_IN_PROGRESS: level.Debug(m.logger).Log( "msg", "compaction job still in progress", @@ -204,7 +207,7 @@ func (m *metastoreState) convertJobs(jobs []*compactionpb.CompactionJob) (conver // populate block metadata (workers rely on it) blocks := make([]*metastorev1.BlockMeta, 0, len(job.Blocks)) for _, bId := range job.Blocks { - b := m.findBlock(job.Shard, bId) + b := m.index.FindBlock(job.Shard, job.TenantId, bId) if b == nil { level.Error(m.logger).Log( "msg", "failed to populate compaction job details, block not found", @@ -240,14 +243,6 @@ func (m *metastoreState) convertJobs(jobs []*compactionpb.CompactionJob) (conver return convertedJobs, invalidJobs } -func (m *metastoreState) findBlock(shard uint32, blockId string) *metastorev1.BlockMeta { - segmentShard := m.getOrCreateShard(shard) - segmentShard.segmentsMutex.Lock() - defer segmentShard.segmentsMutex.Unlock() - - return segmentShard.segments[blockId] -} - func (m *metastoreState) findJobsToAssign(jobCapacity int, raftLogIndex uint64, now int64) []*compactionpb.CompactionJob { jobsToAssign := make([]*compactionpb.CompactionJob, 0, jobCapacity) jobCount, newJobs, inProgressJobs, completedJobs, failedJobs, cancelledJobs := m.compactionJobQueue.stats() @@ -278,33 +273,17 @@ func (m *metastoreState) findJobsToAssign(jobCapacity int, raftLogIndex uint64, func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { return m.db.boltdb.Update(func(tx *bbolt.Tx) error { - for shard, blocks := range sTable.newBlocks { - for _, b := range blocks { - block := m.findBlock(shard, b) - if block == nil { - level.Error(m.logger).Log( - "msg", "a newly compacted block could not be found", - "block", b, - "shard", shard, - ) - continue - } - name, key := keyForBlockMeta(shard, "", b) - err := updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error { - bValue, _ := block.MarshalVT() - return bucket.Put(key, bValue) - }) + for _, blocks := range sTable.newBlocks { + for _, block := range blocks { + err := m.persistBlock(tx, block) if err != nil { return err } } } - for shard, blocks := range sTable.deletedBlocks { - for _, b := range blocks { - name, key := keyForBlockMeta(shard, "", b) - err := updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error { - return bucket.Delete(key) - }) + for key, blocks := range sTable.deletedBlocks { + for _, block := range blocks { + err := m.deleteBlock(tx, key.shard, key.tenant, block) if err != nil { return err } @@ -377,3 +356,16 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { return nil }) } + +func (m *metastoreState) deleteBlock(tx *bbolt.Tx, shardId uint32, tenant, blockId string) error { + metas := m.index.FindPartitionMetas(blockId) + for _, meta := range metas { + err := updateBlockMetadataBucket(tx, meta.Key, shardId, tenant, func(bucket *bbolt.Bucket) error { + return bucket.Delete([]byte(blockId)) + }) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go index 3b02e6f77b..db03557f5f 100644 --- a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go +++ b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go @@ -1,11 +1,12 @@ package metastore import ( - "fmt" + "crypto/rand" "testing" "time" "github.com/hashicorp/raft" + "github.com/oklog/ulid" "github.com/stretchr/testify/require" compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" @@ -40,7 +41,7 @@ func Test_JobAssignments(t *testing.T) { func Test_StatusUpdates_Success(t *testing.T) { // add enough blocks to create 2 jobs m := initState(t) - addLevel0Blocks(m, 40) + sourceBlocks := addLevel0Blocks(m, 40) require.Equal(t, 2, len(m.compactionJobQueue.jobs)) // assign the 2 jobs @@ -54,7 +55,7 @@ func Test_StatusUpdates_Success(t *testing.T) { JobName: resp.CompactionJobs[0].Name, Status: compactorv1.CompactionStatus_COMPACTION_STATUS_SUCCESS, CompletedJob: &compactorv1.CompletedJob{ - Blocks: []*metastorev1.BlockMeta{createBlock(40, 0, "", 1)}, + Blocks: []*metastorev1.BlockMeta{createBlock(0, "", 1)}, }, RaftLogIndex: 20, Shard: 0, @@ -64,7 +65,7 @@ func Test_StatusUpdates_Success(t *testing.T) { JobName: resp.CompactionJobs[1].Name, Status: compactorv1.CompactionStatus_COMPACTION_STATUS_SUCCESS, CompletedJob: &compactorv1.CompletedJob{ - Blocks: []*metastorev1.BlockMeta{createBlock(41, 0, "", 1)}, + Blocks: []*metastorev1.BlockMeta{createBlock(0, "", 1)}, }, RaftLogIndex: 20, Shard: 0, @@ -79,16 +80,16 @@ func Test_StatusUpdates_Success(t *testing.T) { require.Equalf(t, 0, len(m.compactionJobQueue.jobs), "compaction job queue should be empty") // compacted blocks are added - b40 := m.getOrCreateShard(0).segments["b-40"] - b41 := m.getOrCreateShard(0).segments["b-41"] - require.NotNilf(t, b40, "compacted block not found in state") - require.NotNilf(t, b41, "compacted block not found in state") - require.Equalf(t, uint32(1), b40.CompactionLevel, "compacted block has wrong level") - require.Equalf(t, uint32(1), b41.CompactionLevel, "compacted block has wrong level") + blockOne := m.index.FindBlock(0, "", statusUpdates[0].CompletedJob.Blocks[0].Id) + blockTwo := m.index.FindBlock(0, "", statusUpdates[1].CompletedJob.Blocks[0].Id) + require.NotNilf(t, blockOne, "compacted block not found in state") + require.NotNilf(t, blockTwo, "compacted block not found in state") + require.Equalf(t, uint32(1), blockOne.CompactionLevel, "compacted block has wrong level") + require.Equalf(t, uint32(1), blockTwo.CompactionLevel, "compacted block has wrong level") // source blocks are removed - for i := 0; i < 40; i++ { - require.Nilf(t, m.getOrCreateShard(0).segments[fmt.Sprintf("b-%d", i)], "old block %d found in state", i) + for _, b := range sourceBlocks { + require.Nilf(t, m.index.FindBlock(b.Shard, b.TenantId, b.Id), "old block %s found in state", b.Id) } } @@ -170,7 +171,7 @@ func Test_OwnershipTransfer(t *testing.T) { JobName: resp.CompactionJobs[0].Name, Status: compactorv1.CompactionStatus_COMPACTION_STATUS_SUCCESS, CompletedJob: &compactorv1.CompletedJob{ - Blocks: []*metastorev1.BlockMeta{createBlock(20, 0, "", 1)}, + Blocks: []*metastorev1.BlockMeta{createBlock(0, "", 1)}, }, RaftLogIndex: 21, Shard: 0, @@ -203,13 +204,13 @@ func Test_CompactedBlockCanCreateNewJob(t *testing.T) { CompletedJob: &compactorv1.CompletedJob{ Blocks: []*metastorev1.BlockMeta{ { - Id: "b-20-1", + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: uint32(0), TenantId: "t1", CompactionLevel: uint32(1), }, { - Id: "b-21-1", + Id: ulid.MustNew(ulid.Now(), rand.Reader).String(), Shard: uint32(0), TenantId: "t1", CompactionLevel: uint32(1), @@ -235,7 +236,7 @@ func Test_CompactedBlockCanCreateNewJob(t *testing.T) { shard: 0, } require.Equalf(t, 1, len(m.compactionJobBlockQueues[key].blocksByLevel[1]), "there should be one level-1 block in the queue") - require.Equalf(t, "b-21-1", m.compactionJobBlockQueues[key].blocksByLevel[1][0], "the block id should match the second compacted block") + require.Equalf(t, statusUpdates[0].CompletedJob.Blocks[1].Id, m.compactionJobBlockQueues[key].blocksByLevel[1][0], "the block id should match the second compacted block") } func Test_FailedCompaction(t *testing.T) { @@ -287,15 +288,15 @@ func Test_PanicWithDbErrors(t *testing.T) { func Test_RemoveInvalidJobsFromStorage(t *testing.T) { m := initState(t) - addLevel0Blocks(m, 20) + blocks := addLevel0Blocks(m, 20) require.Equal(t, 1, len(m.compactionJobQueue.jobs), "there should be one job in the queue") // delete all blocks, making the existing job invalid - for _, shard := range m.shards { - for _, segment := range shard.segments { - shard.deleteSegment(segment.Id) - } + sources := make([]string, 0, 20) + for _, block := range blocks { + sources = append(sources, block.Id) } + m.index.ReplaceBlocks(sources, 0, "", []*metastorev1.BlockMeta{}) // try to assign the job resp, err := m.pollCompactionJobs(&compactorv1.PollCompactionJobsRequest{JobCapacity: 1}, 20, 20) @@ -305,32 +306,25 @@ func Test_RemoveInvalidJobsFromStorage(t *testing.T) { verifyCompactionState(t, m) } -func addLevel0Blocks(m *metastoreState, count int) { - for i := 0; i < count; i++ { - b := createBlock(i, 0, "", 0) - raftLog := &raft.Log{ - Index: uint64(i), - AppendedAt: time.Unix(0, int64(i)), - } - _, _ = m.applyAddBlock(raftLog, &metastorev1.AddBlockRequest{Block: b}) - } -} - -func addLevel0BlocksForShard(m *metastoreState, count int, shard int) { +func addLevel0Blocks(m *metastoreState, count int) []*metastorev1.BlockMeta { + blocks := make([]*metastorev1.BlockMeta, 0, count) for i := 0; i < count; i++ { - b := createBlock(i, shard, "", 0) + b := createBlock(0, "", 0) + b.MinTime = time.Now().UnixMilli() + b.MaxTime = time.Now().UnixMilli() + blocks = append(blocks, b) raftLog := &raft.Log{ Index: uint64(i), AppendedAt: time.Unix(0, int64(i)), } _, _ = m.applyAddBlock(raftLog, &metastorev1.AddBlockRequest{Block: b}) } + return blocks } func addLevel1Blocks(m *metastoreState, tenant string, count int) { for i := 0; i < count; i++ { - b := createBlock(i, 0, tenant, 1) - b.Id = fmt.Sprintf("b-%d-%d", i, 1) + b := createBlock(0, tenant, 1) raftLog := &raft.Log{ Index: uint64(i), AppendedAt: time.Unix(0, int64(i)), diff --git a/pkg/experiment/metastore/metastore_state_query_metadata.go b/pkg/experiment/metastore/metastore_state_query_metadata.go index 64d9d88d7c..11a48691e3 100644 --- a/pkg/experiment/metastore/metastore_state_query_metadata.go +++ b/pkg/experiment/metastore/metastore_state_query_metadata.go @@ -5,11 +5,10 @@ import ( "fmt" "slices" "strings" - "sync" + "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" - "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -32,6 +31,10 @@ type metadataQuery struct { serviceMatcher *labels.Matcher } +func (q *metadataQuery) String() string { + return fmt.Sprintf("start: %d, end: %d, tenants: %v, serviceMatcher: %v", q.startTime, q.endTime, q.tenants, q.serviceMatcher) +} + func newMetadataQuery(request *metastorev1.QueryMetadataRequest) (*metadataQuery, error) { if len(request.TenantId) == 0 { return nil, fmt.Errorf("tenant_id is required") @@ -60,10 +63,6 @@ func newMetadataQuery(request *metastorev1.QueryMetadataRequest) (*metadataQuery return q, nil } -func (q *metadataQuery) matchBlock(b *metastorev1.BlockMeta) bool { - return inRange(b.MinTime, b.MaxTime, q.startTime, q.endTime) -} - func (q *metadataQuery) matchService(s *metastorev1.Dataset) bool { _, ok := q.tenants[s.TenantId] if !ok { @@ -82,28 +81,6 @@ func inRange(blockStart, blockEnd, queryStart, queryEnd int64) bool { return blockStart <= queryEnd && blockEnd >= queryStart } -func (s *metastoreShard) listBlocksForQuery(q *metadataQuery) map[string]*metastorev1.BlockMeta { - s.segmentsMutex.Lock() - defer s.segmentsMutex.Unlock() - md := make(map[string]*metastorev1.BlockMeta, 32) - for _, segment := range s.segments { - if !q.matchBlock(segment) { - continue - } - var block *metastorev1.BlockMeta - for _, svc := range segment.Datasets { - if q.matchService(svc) { - if block == nil { - block = cloneBlockForQuery(segment) - md[segment.Id] = block - } - block.Datasets = append(block.Datasets, svc) - } - } - } - return md -} - func cloneBlockForQuery(b *metastorev1.BlockMeta) *metastorev1.BlockMeta { datasets := b.Datasets b.Datasets = nil @@ -121,25 +98,30 @@ func (m *metastoreState) listBlocksForQuery( if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } - var respMutex sync.Mutex var resp metastorev1.QueryMetadataResponse - g, ctx := errgroup.WithContext(ctx) - m.shardsMutex.Lock() - for _, s := range m.shards { - s := s - g.Go(func() error { - blocks := s.listBlocksForQuery(q) - respMutex.Lock() - for _, b := range blocks { - resp.Blocks = append(resp.Blocks, b) + + md := make(map[string]*metastorev1.BlockMeta, 32) + blocks, err := m.index.FindBlocksInRange(q.startTime, q.endTime, q.tenants) + if err != nil { + level.Error(m.logger).Log("msg", "failed to list metastore blocks", "query", q, "err", err) + return nil, status.Error(codes.Internal, err.Error()) + } + for _, block := range blocks { + var clone *metastorev1.BlockMeta + for _, svc := range block.Datasets { + if q.matchService(svc) { + if clone == nil { + clone = cloneBlockForQuery(block) + md[clone.Id] = clone + } + clone.Datasets = append(clone.Datasets, svc) } - respMutex.Unlock() - return nil - }) + } } - m.shardsMutex.Unlock() - if err = g.Wait(); err != nil { - return nil, err + + resp.Blocks = make([]*metastorev1.BlockMeta, 0, len(md)) + for _, block := range md { + resp.Blocks = append(resp.Blocks, block) } slices.SortFunc(resp.Blocks, func(a, b *metastorev1.BlockMeta) int { return strings.Compare(a.Id, b.Id) diff --git a/pkg/experiment/metastore/metastore_state_test.go b/pkg/experiment/metastore/metastore_state_test.go index f60aa2c2d1..f7218a4269 100644 --- a/pkg/experiment/metastore/metastore_state_test.go +++ b/pkg/experiment/metastore/metastore_state_test.go @@ -9,6 +9,7 @@ import ( "go.etcd.io/bbolt" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + "github.com/grafana/pyroscope/pkg/experiment/metastore/index" "github.com/grafana/pyroscope/pkg/util" ) @@ -22,7 +23,7 @@ func TestMetadataStateManagement(t *testing.T) { err := db.open(false) require.NoError(t, err) - m := newMetastoreState(util.Logger, db, reg, &config.Compaction) + m := newMetastoreState(util.Logger, db, reg, &config.Compaction, &index.DefaultConfig) require.NotNil(t, m) t.Run("restore compaction state", func(t *testing.T) { @@ -58,34 +59,4 @@ func TestMetadataStateManagement(t *testing.T) { require.Equal(t, 1, len(queue.blocksByLevel)) require.Equal(t, 5, len(queue.blocksByLevel[0])) }) - - t.Run("restore block state", func(t *testing.T) { - for i := 0; i < 420; i++ { - err = db.boltdb.Update(func(tx *bbolt.Tx) error { - block := &metastorev1.BlockMeta{ - Id: fmt.Sprintf("b-%d", i), - Shard: uint32(i % 4), - } - name, key := keyForBlockMeta(block.Shard, "", block.Id) - value, err := block.MarshalVT() - require.NoError(t, err) - err = updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error { - return bucket.Put(key, value) - }) - return err - }) - require.NoError(t, err) - } - - // restore from db - err = db.boltdb.Update(func(tx *bbolt.Tx) error { - return m.restoreBlockMetadata(tx) - }) - require.NoError(t, err) - - require.Equal(t, 4, len(m.shards)) - for shard := range m.shards { - require.Equal(t, 105, len(m.getOrCreateShard(shard).segments)) - } - }) } diff --git a/pkg/experiment/metastore/raftlogpb/raflog.pb.go b/pkg/experiment/metastore/raftlogpb/raflog.pb.go index 19014faa75..1e944558ec 100644 --- a/pkg/experiment/metastore/raftlogpb/raflog.pb.go +++ b/pkg/experiment/metastore/raftlogpb/raflog.pb.go @@ -26,23 +26,19 @@ const ( CommandType_COMMAND_TYPE_UNKNOWN CommandType = 0 CommandType_COMMAND_TYPE_ADD_BLOCK CommandType = 1 CommandType_COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS CommandType = 2 - // This is a temporary solution. - CommandType_COMMAND_TYPE_TRUNCATE CommandType = 4196 ) // Enum value maps for CommandType. var ( CommandType_name = map[int32]string{ - 0: "COMMAND_TYPE_UNKNOWN", - 1: "COMMAND_TYPE_ADD_BLOCK", - 2: "COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS", - 4196: "COMMAND_TYPE_TRUNCATE", + 0: "COMMAND_TYPE_UNKNOWN", + 1: "COMMAND_TYPE_ADD_BLOCK", + 2: "COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS", } CommandType_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_ADD_BLOCK": 1, "COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS": 2, - "COMMAND_TYPE_TRUNCATE": 4196, } ) @@ -189,26 +185,25 @@ var file_experiment_metastore_raftlogpb_raflog_proto_rawDesc = []byte{ 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x2f, 0x0a, 0x0f, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2a, 0x8d, 0x01, - 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, - 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, - 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x43, 0x4f, 0x4d, 0x4d, 0x41, - 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x44, 0x44, 0x5f, 0x42, 0x4c, 0x4f, 0x43, - 0x4b, 0x10, 0x01, 0x12, 0x2c, 0x0a, 0x28, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x50, 0x4f, 0x4c, 0x4c, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, - 0x49, 0x4f, 0x4e, 0x5f, 0x4a, 0x4f, 0x42, 0x53, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x10, - 0x02, 0x12, 0x1a, 0x0a, 0x15, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, - 0x45, 0x5f, 0x54, 0x52, 0x55, 0x4e, 0x43, 0x41, 0x54, 0x45, 0x10, 0xe4, 0x20, 0x42, 0x98, 0x01, - 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x42, 0x0b, - 0x52, 0x61, 0x66, 0x6c, 0x6f, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3f, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, - 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x70, 0x6b, 0x67, 0x2f, - 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x61, 0x66, 0x74, 0x6c, 0x6f, 0x67, 0x70, 0x62, 0xa2, 0x02, - 0x03, 0x52, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xca, 0x02, - 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xe2, 0x02, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4c, - 0x6f, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2a, 0x71, 0x0a, + 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, + 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, + 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1a, 0x0a, 0x16, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, + 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x41, 0x44, 0x44, 0x5f, 0x42, 0x4c, 0x4f, 0x43, 0x4b, + 0x10, 0x01, 0x12, 0x2c, 0x0a, 0x28, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x50, 0x4f, 0x4c, 0x4c, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, + 0x4f, 0x4e, 0x5f, 0x4a, 0x4f, 0x42, 0x53, 0x5f, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x10, 0x02, + 0x42, 0x98, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, + 0x67, 0x42, 0x0b, 0x52, 0x61, 0x66, 0x6c, 0x6f, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, + 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, + 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x2f, 0x6d, 0x65, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x61, 0x66, 0x74, 0x6c, 0x6f, 0x67, 0x70, + 0x62, 0xa2, 0x02, 0x03, 0x52, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, + 0x67, 0xca, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xe2, 0x02, 0x13, 0x52, 0x61, + 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0xea, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/experiment/metastore/raftlogpb/raflog.proto b/pkg/experiment/metastore/raftlogpb/raflog.proto index fa41df2fd9..8f1282bc01 100644 --- a/pkg/experiment/metastore/raftlogpb/raflog.proto +++ b/pkg/experiment/metastore/raftlogpb/raflog.proto @@ -11,9 +11,6 @@ enum CommandType { COMMAND_TYPE_UNKNOWN = 0; COMMAND_TYPE_ADD_BLOCK = 1; COMMAND_TYPE_POLL_COMPACTION_JOBS_STATUS = 2; - - // This is a temporary solution. - COMMAND_TYPE_TRUNCATE = 4196; } message TruncateCommand { diff --git a/pkg/frontend/read_path/query_frontend/query_select_merge_profile.go b/pkg/frontend/read_path/query_frontend/query_select_merge_profile.go index 6655993478..333529d995 100644 --- a/pkg/frontend/read_path/query_frontend/query_select_merge_profile.go +++ b/pkg/frontend/read_path/query_frontend/query_select_merge_profile.go @@ -59,7 +59,7 @@ func (q *QueryFrontend) SelectMergeProfile( return nil, err } if report == nil { - return nil, nil + return connect.NewResponse(&profilev1.Profile{}), nil } var p profilev1.Profile if err = pprof.Unmarshal(report.Pprof.Pprof, &p); err != nil { diff --git a/pkg/test/mocks/mockindex/mock_store.go b/pkg/test/mocks/mockindex/mock_store.go new file mode 100644 index 0000000000..4139ca576f --- /dev/null +++ b/pkg/test/mocks/mockindex/mock_store.go @@ -0,0 +1,289 @@ +// Code generated by mockery. DO NOT EDIT. + +package mockindex + +import ( + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + index "github.com/grafana/pyroscope/pkg/experiment/metastore/index" + + mock "github.com/stretchr/testify/mock" +) + +// MockStore is an autogenerated mock type for the Store type +type MockStore struct { + mock.Mock +} + +type MockStore_Expecter struct { + mock *mock.Mock +} + +func (_m *MockStore) EXPECT() *MockStore_Expecter { + return &MockStore_Expecter{mock: &_m.Mock} +} + +// ListBlocks provides a mock function with given fields: p, shard, tenant +func (_m *MockStore) ListBlocks(p index.PartitionKey, shard uint32, tenant string) []*metastorev1.BlockMeta { + ret := _m.Called(p, shard, tenant) + + if len(ret) == 0 { + panic("no return value specified for ListBlocks") + } + + var r0 []*metastorev1.BlockMeta + if rf, ok := ret.Get(0).(func(index.PartitionKey, uint32, string) []*metastorev1.BlockMeta); ok { + r0 = rf(p, shard, tenant) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*metastorev1.BlockMeta) + } + } + + return r0 +} + +// MockStore_ListBlocks_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListBlocks' +type MockStore_ListBlocks_Call struct { + *mock.Call +} + +// ListBlocks is a helper method to define mock.On call +// - p index.PartitionKey +// - shard uint32 +// - tenant string +func (_e *MockStore_Expecter) ListBlocks(p interface{}, shard interface{}, tenant interface{}) *MockStore_ListBlocks_Call { + return &MockStore_ListBlocks_Call{Call: _e.mock.On("ListBlocks", p, shard, tenant)} +} + +func (_c *MockStore_ListBlocks_Call) Run(run func(p index.PartitionKey, shard uint32, tenant string)) *MockStore_ListBlocks_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(index.PartitionKey), args[1].(uint32), args[2].(string)) + }) + return _c +} + +func (_c *MockStore_ListBlocks_Call) Return(_a0 []*metastorev1.BlockMeta) *MockStore_ListBlocks_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStore_ListBlocks_Call) RunAndReturn(run func(index.PartitionKey, uint32, string) []*metastorev1.BlockMeta) *MockStore_ListBlocks_Call { + _c.Call.Return(run) + return _c +} + +// ListPartitions provides a mock function with given fields: +func (_m *MockStore) ListPartitions() []index.PartitionKey { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ListPartitions") + } + + var r0 []index.PartitionKey + if rf, ok := ret.Get(0).(func() []index.PartitionKey); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]index.PartitionKey) + } + } + + return r0 +} + +// MockStore_ListPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListPartitions' +type MockStore_ListPartitions_Call struct { + *mock.Call +} + +// ListPartitions is a helper method to define mock.On call +func (_e *MockStore_Expecter) ListPartitions() *MockStore_ListPartitions_Call { + return &MockStore_ListPartitions_Call{Call: _e.mock.On("ListPartitions")} +} + +func (_c *MockStore_ListPartitions_Call) Run(run func()) *MockStore_ListPartitions_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStore_ListPartitions_Call) Return(_a0 []index.PartitionKey) *MockStore_ListPartitions_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStore_ListPartitions_Call) RunAndReturn(run func() []index.PartitionKey) *MockStore_ListPartitions_Call { + _c.Call.Return(run) + return _c +} + +// ListShards provides a mock function with given fields: p +func (_m *MockStore) ListShards(p index.PartitionKey) []uint32 { + ret := _m.Called(p) + + if len(ret) == 0 { + panic("no return value specified for ListShards") + } + + var r0 []uint32 + if rf, ok := ret.Get(0).(func(index.PartitionKey) []uint32); ok { + r0 = rf(p) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]uint32) + } + } + + return r0 +} + +// MockStore_ListShards_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListShards' +type MockStore_ListShards_Call struct { + *mock.Call +} + +// ListShards is a helper method to define mock.On call +// - p index.PartitionKey +func (_e *MockStore_Expecter) ListShards(p interface{}) *MockStore_ListShards_Call { + return &MockStore_ListShards_Call{Call: _e.mock.On("ListShards", p)} +} + +func (_c *MockStore_ListShards_Call) Run(run func(p index.PartitionKey)) *MockStore_ListShards_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(index.PartitionKey)) + }) + return _c +} + +func (_c *MockStore_ListShards_Call) Return(_a0 []uint32) *MockStore_ListShards_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStore_ListShards_Call) RunAndReturn(run func(index.PartitionKey) []uint32) *MockStore_ListShards_Call { + _c.Call.Return(run) + return _c +} + +// ListTenants provides a mock function with given fields: p, shard +func (_m *MockStore) ListTenants(p index.PartitionKey, shard uint32) []string { + ret := _m.Called(p, shard) + + if len(ret) == 0 { + panic("no return value specified for ListTenants") + } + + var r0 []string + if rf, ok := ret.Get(0).(func(index.PartitionKey, uint32) []string); ok { + r0 = rf(p, shard) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + +// MockStore_ListTenants_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListTenants' +type MockStore_ListTenants_Call struct { + *mock.Call +} + +// ListTenants is a helper method to define mock.On call +// - p index.PartitionKey +// - shard uint32 +func (_e *MockStore_Expecter) ListTenants(p interface{}, shard interface{}) *MockStore_ListTenants_Call { + return &MockStore_ListTenants_Call{Call: _e.mock.On("ListTenants", p, shard)} +} + +func (_c *MockStore_ListTenants_Call) Run(run func(p index.PartitionKey, shard uint32)) *MockStore_ListTenants_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(index.PartitionKey), args[1].(uint32)) + }) + return _c +} + +func (_c *MockStore_ListTenants_Call) Return(_a0 []string) *MockStore_ListTenants_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStore_ListTenants_Call) RunAndReturn(run func(index.PartitionKey, uint32) []string) *MockStore_ListTenants_Call { + _c.Call.Return(run) + return _c +} + +// ReadPartitionMeta provides a mock function with given fields: p +func (_m *MockStore) ReadPartitionMeta(p index.PartitionKey) (*index.PartitionMeta, error) { + ret := _m.Called(p) + + if len(ret) == 0 { + panic("no return value specified for ReadPartitionMeta") + } + + var r0 *index.PartitionMeta + var r1 error + if rf, ok := ret.Get(0).(func(index.PartitionKey) (*index.PartitionMeta, error)); ok { + return rf(p) + } + if rf, ok := ret.Get(0).(func(index.PartitionKey) *index.PartitionMeta); ok { + r0 = rf(p) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*index.PartitionMeta) + } + } + + if rf, ok := ret.Get(1).(func(index.PartitionKey) error); ok { + r1 = rf(p) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockStore_ReadPartitionMeta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadPartitionMeta' +type MockStore_ReadPartitionMeta_Call struct { + *mock.Call +} + +// ReadPartitionMeta is a helper method to define mock.On call +// - p index.PartitionKey +func (_e *MockStore_Expecter) ReadPartitionMeta(p interface{}) *MockStore_ReadPartitionMeta_Call { + return &MockStore_ReadPartitionMeta_Call{Call: _e.mock.On("ReadPartitionMeta", p)} +} + +func (_c *MockStore_ReadPartitionMeta_Call) Run(run func(p index.PartitionKey)) *MockStore_ReadPartitionMeta_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(index.PartitionKey)) + }) + return _c +} + +func (_c *MockStore_ReadPartitionMeta_Call) Return(_a0 *index.PartitionMeta, _a1 error) *MockStore_ReadPartitionMeta_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockStore_ReadPartitionMeta_Call) RunAndReturn(run func(index.PartitionKey) (*index.PartitionMeta, error)) *MockStore_ReadPartitionMeta_Call { + _c.Call.Return(run) + return _c +} + +// NewMockStore creates a new instance of MockStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockStore(t interface { + mock.TestingT + Cleanup(func()) +}) *MockStore { + mock := &MockStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/tools/dev/experiment/values-micro-services-experiment.yaml b/tools/dev/experiment/values-micro-services-experiment.yaml index 8bb0648e04..db588fa2e1 100644 --- a/tools/dev/experiment/values-micro-services-experiment.yaml +++ b/tools/dev/experiment/values-micro-services-experiment.yaml @@ -2,13 +2,17 @@ pyroscope: extraEnvVars: PYROSCOPE_V2_EXPERIMENT: 1 extraArgs: - query-backend.address: "dns:///_grpc._tcp.pyroscope-query-worker-headless.$(NAMESPACE_FQDN):9095" + query-backend.address: "dns:///_grpc._tcp.pyroscope-query-backend-headless.$(NAMESPACE_FQDN):9095" metastore.address: "kubernetes:///pyroscope-metastore-headless.$(NAMESPACE_FQDN):9095" metastore.raft.bind-address: ":9099" metastore.raft.server-id: "$(POD_NAME).pyroscope-metastore-headless.$(NAMESPACE_FQDN):9099" metastore.raft.advertise-address: "$(POD_NAME).pyroscope-metastore-headless.$(NAMESPACE_FQDN):9099" metastore.raft.bootstrap-peers: "dnssrvnoa+_raft._tcp.pyroscope-metastore-headless.$(NAMESPACE_FQDN):9099" metastore.raft.bootstrap-expect-peers: "3" + distributor.replication-factor: "1" + write-path: "segment-writer" + enable-query-backend: "true" + querier.max-query-length: '7d' components: distributor: