Skip to content

Commit

Permalink
Improve FindBlocksInRange, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Sep 27, 2024
1 parent 8a451dc commit 5755b3e
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 90 deletions.
57 changes: 18 additions & 39 deletions pkg/experiment/metastore/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,24 @@ type Index struct {
}

type Config struct {
PartitionDuration time.Duration `yaml:"partition_duration"`
PartitionTTL time.Duration `yaml:"partition_ttl"`
CleanupInterval time.Duration `yaml:"cleanup_interval"`
PartitionDuration time.Duration `yaml:"partition_duration"`
PartitionTTL time.Duration `yaml:"partition_ttl"`
CleanupInterval time.Duration `yaml:"cleanup_interval"`
QueryLookaroundPeriod time.Duration `yaml:"query_lookaround_period"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.PartitionDuration, prefix+"partition-duration", DefaultConfig.PartitionDuration, "")
f.DurationVar(&cfg.PartitionTTL, prefix+"partition-ttl", DefaultConfig.PartitionTTL, "")
f.DurationVar(&cfg.CleanupInterval, prefix+"cleanup-interval", DefaultConfig.CleanupInterval, "")
f.DurationVar(&cfg.QueryLookaroundPeriod, prefix+"query-lookaround-period", DefaultConfig.QueryLookaroundPeriod, "")
}

var DefaultConfig = Config{
PartitionDuration: time.Hour,
PartitionTTL: 4 * time.Hour,
CleanupInterval: 5 * time.Minute,
PartitionDuration: time.Hour,
PartitionTTL: 4 * time.Hour,
CleanupInterval: 5 * time.Minute,
QueryLookaroundPeriod: time.Hour,
}

type indexPartition struct {
Expand Down Expand Up @@ -361,47 +364,21 @@ func (i *Index) findBlockInPartition(key PartitionKey, shard uint32, tenant stri
func (i *Index) FindBlocksInRange(start, end int64, tenants map[string]struct{}) ([]*metastorev1.BlockMeta, error) {
i.partitionMu.Lock()
defer i.partitionMu.Unlock()
startWithLookaround := start - i.Config.QueryLookaroundPeriod.Milliseconds()
endWithLookaround := end + i.Config.QueryLookaroundPeriod.Milliseconds()

blocks := make([]*metastorev1.BlockMeta, 0)

firstPartitionIdx, lastPartitionIdx := -1, -1
for idx, meta := range i.allPartitions {
if meta.overlaps(start, end) {
if firstPartitionIdx == -1 {
firstPartitionIdx = idx
}
for _, meta := range i.allPartitions { // TODO aleks-p: consider using binary search to find a good starting point
if meta.overlaps(startWithLookaround, endWithLookaround) {
p, err := i.getOrLoadPartition(meta)
if err != nil {
level.Error(i.logger).Log("msg", "error loading partition", "key", meta.Key, "err", err)
return nil, err
}
tenantBlocks := i.collectTenantBlocks(p, tenants)
tenantBlocks := i.collectTenantBlocks(p, start, end, tenants)
blocks = append(blocks, tenantBlocks...)
} else if firstPartitionIdx != -1 {
lastPartitionIdx = idx - 1
}
}

if firstPartitionIdx > 0 {
meta := i.allPartitions[firstPartitionIdx-1]
p, err := i.getOrLoadPartition(meta)
if err != nil {
level.Error(i.logger).Log("msg", "error loading previous partition", "key", meta.Key, "err", err)
return nil, err
}
tenantBlocks := i.collectTenantBlocks(p, tenants)
blocks = append(blocks, tenantBlocks...)
}

if lastPartitionIdx > -1 && lastPartitionIdx < len(i.allPartitions)-1 {
meta := i.allPartitions[lastPartitionIdx+1]
p, err := i.getOrLoadPartition(meta)
if err != nil {
level.Error(i.logger).Log("msg", "error loading next partition", "key", meta.Key, "err", err)
return nil, err
}
tenantBlocks := i.collectTenantBlocks(p, tenants)
blocks = append(blocks, tenantBlocks...)
}

return blocks, nil
Expand All @@ -413,7 +390,7 @@ func (i *Index) sortPartitions() {
})
}

func (i *Index) collectTenantBlocks(p *indexPartition, tenants map[string]struct{}) []*metastorev1.BlockMeta {
func (i *Index) collectTenantBlocks(p *indexPartition, start, end int64, tenants map[string]struct{}) []*metastorev1.BlockMeta {
blocks := make([]*metastorev1.BlockMeta, 0)
for _, s := range p.shards {
for tKey, t := range s.tenants {
Expand All @@ -422,7 +399,9 @@ func (i *Index) collectTenantBlocks(p *indexPartition, tenants map[string]struct
continue
}
for _, block := range t.blocks {
blocks = append(blocks, block)
if start < block.MaxTime && end >= block.MinTime {
blocks = append(blocks, block)
}
}
}
}
Expand Down
139 changes: 97 additions & 42 deletions pkg/experiment/metastore/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package index_test
import (
"context"
"crypto/rand"
"math"
"sync"
"testing"
"time"
Expand All @@ -21,49 +20,93 @@ import (
)

func TestIndex_FindBlocksInRange(t *testing.T) {
store := mockindex.NewMockStore(t)
i := index.NewIndex(store, util.Logger, &index.DefaultConfig)

type partitionBlocks struct {
key index.PartitionKey
blocks []*metastorev1.BlockMeta
tests := []struct {
name string
blocks []*metastorev1.BlockMeta
queryStart int64
queryEnd int64
queryTenants []string
want int
}{
{
name: "matching blocks",
blocks: []*metastorev1.BlockMeta{
createBlock("20240923T06.1h", 0),
createBlock("20240923T07.1h", 0),
createBlock("20240923T08.1h", 0),
createBlock("20240923T09.1h", 0),
createBlock("20240923T10.1h", 0),
},
queryStart: createTime("2024-09-23T08:00:00.000Z"),
queryEnd: createTime("2024-09-23T09:00:00.000Z"),
want: 2,
},
{
name: "no matching blocks",
blocks: []*metastorev1.BlockMeta{
createBlock("20240923T06.1h", 0),
createBlock("20240923T07.1h", 0),
createBlock("20240923T08.1h", 0),
createBlock("20240923T09.1h", 0),
createBlock("20240923T10.1h", 0),
},
queryStart: createTime("2024-09-23T04:00:00.000Z"),
queryEnd: createTime("2024-09-23T05:00:00.000Z"),
want: 0,
},
{
name: "out of order ingestion (behind on time)",
blocks: []*metastorev1.BlockMeta{
createBlock("20240923T06.1h", 0),
createBlock("20240923T07.1h", -1*time.Hour), // in range
createBlock("20240923T07.1h", -2*time.Hour), // in range
createBlock("20240923T07.1h", -3*time.Hour), // too old
createBlock("20240923T08.1h", -3*time.Hour), // // technically in range but we will not look here
createBlock("20240923T10.1h", 0),
},
queryStart: createTime("2024-09-23T05:00:00.000Z"),
queryEnd: createTime("2024-09-23T06:00:00.000Z"),
want: 3,
},
{
name: "out of order ingestion (ahead of time)",
blocks: []*metastorev1.BlockMeta{
createBlock("20240923T06.1h", 2*time.Hour), // technically in range but we will not look here
createBlock("20240923T07.1h", 1*time.Hour), // in range
createBlock("20240923T07.1h", 3*time.Hour), // too new
createBlock("20240923T08.1h", 0), // in range
createBlock("20240923T08.1h", 1*time.Hour), // in range
createBlock("20240923T10.1h", 0),
},
queryStart: createTime("2024-09-23T08:00:00.000Z"),
queryEnd: createTime("2024-09-23T09:00:00.000Z"),
want: 3,
},
}

partitions := []*partitionBlocks{
{key: "20240923T06.1h", blocks: []*metastorev1.BlockMeta{
{Id: createUlidString("2024-09-23T06:13:23.123Z")},
{Id: createUlidString("2024-09-23T06:24:23.123Z")},
}},
{key: "20240923T07.1h", blocks: []*metastorev1.BlockMeta{
{Id: createUlidString("2024-09-23T07:13:23.123Z")},
{Id: createUlidString("2024-09-23T07:24:23.123Z")},
}},
{key: "20240923T08.1h", blocks: []*metastorev1.BlockMeta{
{Id: createUlidString("2024-09-23T08:13:23.123Z")},
{Id: createUlidString("2024-09-23T08:24:23.123Z")},
}},
{key: "20240923T09.1h", blocks: []*metastorev1.BlockMeta{
{Id: createUlidString("2024-09-23T09:13:23.123Z")},
{Id: createUlidString("2024-09-23T09:24:23.123Z")},
}},
{key: "20240923T10.1h", blocks: []*metastorev1.BlockMeta{
{Id: createUlidString("2024-09-23T10:13:23.123Z")},
{Id: createUlidString("2024-09-23T10:24:23.123Z")},
}},
}
keys := make([]index.PartitionKey, 0, len(partitions))
for _, partition := range partitions {
keys = append(keys, partition.key)
}
store.On("ListPartitions").Return(keys)
for _, p := range partitions {
mockPartition(store, p.key, p.blocks)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
store := mockindex.NewMockStore(t)
i := index.NewIndex(store, util.Logger, &index.DefaultConfig)
for _, b := range tt.blocks {
i.InsertBlock(b)
}
tenantMap := make(map[string]struct{})
for _, t := range tt.queryTenants {
tenantMap[t] = struct{}{}
}
found, err := i.FindBlocksInRange(tt.queryStart, tt.queryEnd, tenantMap)
require.NoError(t, err)
require.Equal(t, tt.want, len(found))
for _, b := range found {
require.Truef(
t,
tt.queryStart < b.MaxTime && tt.queryEnd >= b.MinTime,
"block %s is not in range, %v : %v", b.Id, time.UnixMilli(b.MinTime).UTC(), time.UnixMilli(b.MaxTime).UTC())
}
})
}
i.LoadPartitions()

found, err := i.FindBlocksInRange(createTime("2024-09-23T08:00:00.123Z"), createTime("2024-09-23T09:00:00.123Z"), map[string]struct{}{})
require.NoError(t, err)
require.Len(t, found, 8)
}

func mockPartition(store *mockindex.MockStore, key index.PartitionKey, blocks []*metastorev1.BlockMeta) {
Expand Down Expand Up @@ -186,18 +229,20 @@ func TestIndex_InsertBlock(t *testing.T) {
block := &metastorev1.BlockMeta{
Id: createUlidString("2024-09-23T08:00:00.123Z"),
TenantId: "tenant-1",
MinTime: createTime("2024-09-23T08:00:00.000Z"),
MaxTime: createTime("2024-09-23T08:05:00.000Z"),
}

i.InsertBlock(block)
require.NotNil(t, i.FindBlock(0, "tenant-1", block.Id))
blocks, err := i.FindBlocksInRange(0, math.MaxInt64, map[string]struct{}{"tenant-1": {}})
blocks, err := i.FindBlocksInRange(createTime("2024-09-23T07:00:00.000Z"), createTime("2024-09-23T09:00:00.000Z"), map[string]struct{}{"tenant-1": {}})
require.NoError(t, err)
require.Len(t, blocks, 1)
require.Equal(t, block, blocks[0])

// inserting the block again is a noop
i.InsertBlock(block)
blocks, err = i.FindBlocksInRange(0, math.MaxInt64, map[string]struct{}{"tenant-1": {}})
blocks, err = i.FindBlocksInRange(createTime("2024-09-23T07:00:00.000Z"), createTime("2024-09-23T09:00:00.000Z"), map[string]struct{}{"tenant-1": {}})
require.NoError(t, err)
require.Len(t, blocks, 1)
require.Equal(t, block, blocks[0])
Expand Down Expand Up @@ -311,3 +356,13 @@ func createTime(t string) int64 {
ts, _ := time.Parse(time.RFC3339, t)
return ts.UnixMilli()
}

func createBlock(key string, offset time.Duration) *metastorev1.BlockMeta {
pKey := index.PartitionKey(key)
ts, _, _ := pKey.Parse()
return &metastorev1.BlockMeta{
Id: createUlidString(ts.Format(time.RFC3339)),
MinTime: ts.Add(offset).UnixMilli(),
MaxTime: ts.Add(offset).Add(5 * time.Minute).UnixMilli(),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package metastore

import (
"crypto/rand"
"math"
"testing"
"time"

Expand Down Expand Up @@ -42,10 +41,7 @@ func Test_JobAssignments(t *testing.T) {
func Test_StatusUpdates_Success(t *testing.T) {
// add enough blocks to create 2 jobs
m := initState(t)
addLevel0Blocks(m, 40)
sourceBlocks, err := m.index.FindBlocksInRange(0, math.MaxInt64, map[string]struct{}{"": {}})
require.NoError(t, err)
require.Equal(t, 40, len(sourceBlocks))
sourceBlocks := addLevel0Blocks(m, 40)
require.Equal(t, 2, len(m.compactionJobQueue.jobs))

// assign the 2 jobs
Expand Down Expand Up @@ -292,12 +288,10 @@ func Test_PanicWithDbErrors(t *testing.T) {

func Test_RemoveInvalidJobsFromStorage(t *testing.T) {
m := initState(t)
addLevel0Blocks(m, 20)
blocks := addLevel0Blocks(m, 20)
require.Equal(t, 1, len(m.compactionJobQueue.jobs), "there should be one job in the queue")

// delete all blocks, making the existing job invalid
blocks, err := m.index.FindBlocksInRange(0, math.MaxInt64, map[string]struct{}{"": {}})
require.NoError(t, err)
sources := make([]string, 0, 20)
for _, block := range blocks {
sources = append(sources, block.Id)
Expand All @@ -312,15 +306,20 @@ func Test_RemoveInvalidJobsFromStorage(t *testing.T) {
verifyCompactionState(t, m)
}

func addLevel0Blocks(m *metastoreState, count int) {
func addLevel0Blocks(m *metastoreState, count int) []*metastorev1.BlockMeta {
blocks := make([]*metastorev1.BlockMeta, 0, count)
for i := 0; i < count; i++ {
b := createBlock(0, "", 0)
b.MinTime = time.Now().UnixMilli()
b.MaxTime = time.Now().UnixMilli()
blocks = append(blocks, b)
raftLog := &raft.Log{
Index: uint64(i),
AppendedAt: time.Unix(0, int64(i)),
}
_, _ = m.applyAddBlock(raftLog, &metastorev1.AddBlockRequest{Block: b})
}
return blocks
}

func addLevel1Blocks(m *metastoreState, tenant string, count int) {
Expand Down

0 comments on commit 5755b3e

Please sign in to comment.