Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support float histogram in store gateway #6925

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
hasher := hashPool.Get().(hash.Hash64)
defer hashPool.Put(hasher)

if in.Encoding() == chunkenc.EncXOR || in.Encoding() == chunkenc.EncHistogram {
if in.Encoding() == chunkenc.EncXOR || in.Encoding() == chunkenc.EncHistogram || in.Encoding() == chunkenc.EncFloatHistogram {
b, err := save(in.Bytes())
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,13 @@ func TestBucketHistogramSeries(t *testing.T) {
})
}

func TestBucketFloatHistogramSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
benchBucketSeries(t, chunkenc.ValFloatHistogram, false, false, samplesPerSeries, series, 1)
})
}

func TestBucketSkipChunksSeries(t *testing.T) {
tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
Expand Down
35 changes: 35 additions & 0 deletions pkg/store/storepb/testutil/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head,
appendFloatSamples(t, app, tsLabel, opts)
case chunkenc.ValHistogram:
appendHistogramSamples(t, app, tsLabel, opts)
case chunkenc.ValFloatHistogram:
appendFloatHistogramSamples(t, app, tsLabel, opts)
}
}
testutil.Ok(t, app.Commit())
Expand Down Expand Up @@ -222,6 +224,39 @@ func appendHistogramSamples(t testing.TB, app storage.Appender, tsLabel int, opt
}
}

func appendFloatHistogramSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) {
sample := &histogram.FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 15,
Sum: 11.5,
PositiveSpans: []histogram.Span{
{Offset: -2, Length: 2},
{Offset: 1, Length: 3},
},
PositiveBuckets: []float64{0.5, 0, 1.5, 2, 3.5},
NegativeSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 3, Length: 2},
},
NegativeBuckets: []float64{1.5, 0.5, 2.5, 3},
}

ref, err := app.AppendHistogram(
0,
labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", tsLabel, LabelLongSuffix), "j", fmt.Sprintf("%v", tsLabel)),
int64(tsLabel)*opts.ScrapeInterval.Milliseconds(),
nil,
sample,
)
testutil.Ok(t, err)

for is := 1; is < opts.SamplesPerSeries; is++ {
_, err := app.AppendHistogram(ref, labels.EmptyLabels(), int64(tsLabel+is)*opts.ScrapeInterval.Milliseconds(), nil, sample)
testutil.Ok(t, err)
}
}

// SeriesServer is test gRPC storeAPI series server.
type SeriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
Expand Down
61 changes: 49 additions & 12 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,37 @@ const (
PromAddrPlaceHolder = "PROMETHEUS_ADDRESS"
)

var histogramSample = histogram.Histogram{
Schema: 0,
Count: 20,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
}
var (
histogramSample = histogram.Histogram{
Schema: 0,
Count: 20,
Sum: -3.1415,
ZeroCount: 12,
ZeroThreshold: 0.001,
NegativeSpans: []histogram.Span{
{Offset: 0, Length: 4},
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{1, 2, -2, 1, -1},
}

floatHistogramSample = histogram.FloatHistogram{
ZeroThreshold: 0.01,
ZeroCount: 5.5,
Count: 15,
Sum: 11.5,
PositiveSpans: []histogram.Span{
{Offset: -2, Length: 2},
{Offset: 1, Length: 3},
},
PositiveBuckets: []float64{0.5, 0, 1.5, 2, 3.5},
NegativeSpans: []histogram.Span{
{Offset: 3, Length: 2},
{Offset: 3, Length: 2},
},
NegativeBuckets: []float64{1.5, 0.5, 2.5, 3},
}
)

func PrometheusBinary() string {
return "prometheus-" + defaultPrometheusVersion
Expand Down Expand Up @@ -463,6 +482,22 @@ func CreateHistogramBlockWithDelay(
return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValHistogram)
}

// CreateFloatHistogramBlockWithDelay writes a block with the given float native histogram series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateFloatHistogramBlockWithDelay(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
blockDelay time.Duration,
extLset labels.Labels,
resolution int64,
hashFunc metadata.HashFunc,
) (id ulid.ULID, err error) {
return createBlockWithDelay(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, hashFunc, chunkenc.ValFloatHistogram)
}

func createBlockWithDelay(ctx context.Context, dir string, series []labels.Labels, numSamples int, mint int64, maxt int64, blockDelay time.Duration, extLset labels.Labels, resolution int64, hashFunc metadata.HashFunc, samplesType chunkenc.ValueType) (ulid.ULID, error) {
blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false, hashFunc, samplesType)
if err != nil {
Expand Down Expand Up @@ -545,6 +580,8 @@ func createBlock(
randMutex.Unlock()
} else if sampleType == chunkenc.ValHistogram {
_, err = app.AppendHistogram(0, lset, t, &histogramSample, nil)
} else if sampleType == chunkenc.ValFloatHistogram {
_, err = app.AppendHistogram(0, lset, t, nil, &floatHistogramSample)
}
if err != nil {
if rerr := app.Rollback(); rerr != nil {
Expand Down
100 changes: 70 additions & 30 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))

floatSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "2")}
nativeHistogramSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "3")}
floatHistogramSeries := []labels.Labels{labels.FromStrings("a", "1", "b", "4")}
extLset := labels.FromStrings("ext1", "value1", "replica", "1")
extLset2 := labels.FromStrings("ext1", "value1", "replica", "2")
extLset3 := labels.FromStrings("ext1", "value2", "replica", "3")
extLset4 := labels.FromStrings("ext1", "value1", "replica", "3")
extLset5 := labels.FromStrings("ext1", "value3", "replica", "1")
extLset6 := labels.FromStrings("ext1", "value3", "replica", "2")

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
t.Cleanup(cancel)
Expand All @@ -116,6 +118,8 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, err)
id5, err := e2eutil.CreateHistogramBlockWithDelay(ctx, dir, nativeHistogramSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset5, 0, metadata.NoneFunc)
testutil.Ok(t, err)
id6, err := e2eutil.CreateFloatHistogramBlockWithDelay(ctx, dir, floatHistogramSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset6, 0, metadata.NoneFunc)
testutil.Ok(t, err)
l := log.NewLogfmtLogger(os.Stdout)
bkt, err := s3.NewBucketWithConfig(l,
e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed")
Expand All @@ -126,13 +130,14 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id4.String()), id4.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String()))
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id6.String()), id6.String()))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 2x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
// thanos_blocks_meta_synced: 4x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand Down Expand Up @@ -161,23 +166,29 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "1",
"b": "4",
"ext1": "value3",
"replica": "2",
},
},
)

// 2 x postings, 3 x series, 2 x chunks.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(9), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_bucket_store_series_blocks_queried"))

tenant1Opts := []e2emon.MetricsOption{
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-1")),
e2emon.WaitMissingMetrics(),
}

// Test per tenant store metrics
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_fetched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_blocks_queried"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(12), []string{"thanos_bucket_store_series_data_touched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(12), []string{"thanos_bucket_store_series_data_fetched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(4), []string{"thanos_bucket_store_series_blocks_queried"}, tenant1Opts...))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery },
time.Now, promclient.QueryOptions{
Expand All @@ -195,37 +206,42 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"b": "3",
"ext1": "value3",
},
{
"a": "1",
"b": "4",
"ext1": "value3",
},
},
)

tenant2Opts := []e2emon.MetricsOption{
e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, tenancy.MetricLabel, "test-tenant-2")),
e2emon.WaitMissingMetrics(),
}
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(18), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3+3), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(24), "thanos_bucket_store_series_data_touched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(16), "thanos_bucket_store_series_data_fetched"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+4), "thanos_bucket_store_series_blocks_queried"))

// Test tenant some tenant specific store metrics
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, tenant2Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_data_fetched"}, tenant2Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_blocks_queried"}, tenant2Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(12), []string{"thanos_bucket_store_series_data_touched"}, tenant2Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(4), []string{"thanos_bucket_store_series_data_fetched"}, tenant2Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(4), []string{"thanos_bucket_store_series_blocks_queried"}, tenant2Opts...))

// the first tenants metrics should be unaffected by the additional query
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_touched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(9), []string{"thanos_bucket_store_series_data_fetched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_bucket_store_series_blocks_queried"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(12), []string{"thanos_bucket_store_series_data_touched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(12), []string{"thanos_bucket_store_series_data_fetched"}, tenant1Opts...))
testutil.Ok(t, s1.WaitSumMetricsWithOptions(e2emon.Equals(4), []string{"thanos_bucket_store_series_blocks_queried"}, tenant1Opts...))

})
t.Run("remove meta.json from id1 block", func(t *testing.T) {
testutil.Ok(t, bkt.Delete(ctx, filepath.Join(id1.String(), block.MetaFilename)))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
// thanos_blocks_meta_synced: 3x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand All @@ -247,21 +263,27 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "1",
"b": "4",
"ext1": "value3",
"replica": "2",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+4), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(8+3), "thanos_bucket_store_series_blocks_queried"))
})
t.Run("upload block id5, similar to id1", func(t *testing.T) {
id5, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, floatSeries, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset4, 0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String()))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 2x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
// thanos_blocks_meta_synced: 4x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(7), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand All @@ -288,19 +310,25 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "1",
"b": "4",
"ext1": "value3",
"replica": "2",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(11+2), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(11+4), "thanos_bucket_store_series_blocks_queried"))
})
t.Run("delete whole id2 block #yolo", func(t *testing.T) {
testutil.Ok(t, block.Delete(ctx, l, bkt, id2))

// Wait for store to sync blocks.
// thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta.
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_blocks_meta_synced"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total"))

testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(3), "thanos_bucket_store_blocks_loaded"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1+1), "thanos_bucket_store_block_drops_total"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total"))

Expand All @@ -321,9 +349,15 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "1",
"b": "4",
"ext1": "value3",
"replica": "2",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(14+1), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(15+3), "thanos_bucket_store_series_blocks_queried"))
})

t.Run("negative offset should work", func(t *testing.T) {
Expand All @@ -344,9 +378,15 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
"ext1": "value3",
"replica": "1",
},
{
"a": "1",
"b": "4",
"ext1": "value3",
"replica": "2",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(15+2), "thanos_bucket_store_series_blocks_queried"))
testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(18+3), "thanos_bucket_store_series_blocks_queried"))
})

// TODO(khyati) Let's add some case for compaction-meta.json once the PR will be merged: https://github.com/thanos-io/thanos/pull/2136.
Expand Down
Loading