Skip to content

Commit

Permalink
add no-chunk option in series API
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>

add e2e tests

Signed-off-by: yeya24 <yb532204897@gmail.com>

add tests for all stores

Signed-off-by: yeya24 <yb532204897@gmail.com>

add changelog

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Dec 18, 2019
1 parent 56abeab commit 547467b
Show file tree
Hide file tree
Showing 18 changed files with 719 additions and 242 deletions.
Empty file removed .dep-finished
Empty file.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts.
- [#1904](https://github.com/thanos-io/thanos/pull/1904) Add a no-chunk option in Store Series API to improve the response time of `/api/v1/series` endpoint.

## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03

Expand Down
10 changes: 5 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -425,7 +425,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -502,7 +502,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse).
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
Expand Down Expand Up @@ -606,7 +606,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

q, err := api.queryableCreate(true, nil, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, noChunk bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, noChunk bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
noChunk: noChunk,
}
}
}
Expand All @@ -42,11 +43,12 @@ type queryable struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
noChunk bool
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.noChunk), nil
}

type querier struct {
Expand All @@ -59,6 +61,7 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
noChunk bool
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -72,6 +75,7 @@ func newQuerier(
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
noChunk bool,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -93,6 +97,7 @@ func newQuerier(
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
noChunk: noChunk,
}
}

Expand Down Expand Up @@ -185,6 +190,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
NoChunk: q.noChunk,
}, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Series()")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, testProxy)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, oneHourMillis, false)
queryable := queryableCreator(false, nil, oneHourMillis, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
},
}

q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false)
q := NewQueryableCreator(nil, testProxy)(false, nil, 9999999, false, false)

engine := promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestQuerier_Series(t *testing.T) {

// Querier clamps the range to [1,300], which should drop some samples of the result above.
// The store API allows endpoints to send more data then initially requested.
q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true)
q := newQuerier(context.Background(), nil, 1, 300, []string{""}, testProxy, false, 0, true, false)
defer func() { testutil.Ok(t, q.Close()) }()

res, _, err := q.Select(&storage.SelectParams{})
Expand Down
13 changes: 9 additions & 4 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,11 +945,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
for set.Next() {
var series storepb.Series

series.Labels, series.Chunks = set.At()

stats.mergedSeriesCount++
stats.mergedChunksCount += len(series.Chunks)
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))

if req.NoChunk {
series.Labels, _ = set.At()
} else {
series.Labels, series.Chunks = set.At()

stats.mergedChunksCount += len(series.Chunks)
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}

if err := srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
Expand Down
45 changes: 42 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {

// TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching.
for i, tcase := range []struct {
req *storepb.SeriesRequest
expected [][]storepb.Label
req *storepb.SeriesRequest
expected [][]storepb.Label
expectedChunkLen int
}{
{
req: &storepb.SeriesRequest{
Expand All @@ -183,7 +184,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -202,7 +205,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -217,7 +222,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -232,7 +239,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -251,7 +260,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -270,7 +281,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -285,7 +298,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
Expand All @@ -299,6 +314,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
},
{
Expand All @@ -308,7 +324,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -323,7 +341,9 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
expectedChunkLen: 3,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
Expand All @@ -345,6 +365,25 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
},
MinTime: mint,
MaxTime: maxt,
NoChunk: false,
},
},
// Test no-chunk option.
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
NoChunk: true,
},
expectedChunkLen: 0,
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
} {
Expand All @@ -357,7 +396,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {

for i, s := range srv.SeriesSet {
testutil.Equals(t, tcase.expected[i], s.Labels)
testutil.Equals(t, 3, len(s.Chunks))
testutil.Equals(t, tcase.expectedChunkLen, len(s.Chunks))
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions pkg/store/matchers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package store

import (
"strings"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -33,3 +35,53 @@ func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err er
}
return res, nil
}

// matchersToMetric converts label matchers to metric format.
func matchersToMetric(ms []storepb.LabelMatcher) string {
var (
res string
nameMatchers []*labels.Matcher
otherMatchers []string
)

matchers, err := translateMatchers(ms)
if err != nil {
return ""
}

for _, m := range matchers {
if m.Name == "__name__" {
nameMatchers = append(nameMatchers, m)
} else {
otherMatchers = append(otherMatchers, m.String())
}
}

res = strings.Join(otherMatchers, ", ")

if len(nameMatchers) == 1 {
// If there is only one equal name matcher, we can put it outside the {}.
if nameMatchers[0].Type == labels.MatchEqual {
if len(otherMatchers) == 0 {
return nameMatchers[0].Value
}
return nameMatchers[0].Value + "{" + res + "}"
} else {
if len(otherMatchers) == 0 {
return "{" + nameMatchers[0].String() + "}"
} else {
return "{" + nameMatchers[0].String() + ", " + res + "}"
}
}
}

// If more than one name matcher, we need to put them all inside {}.
for i, v := range nameMatchers {
res += v.String()
if i != len(nameMatchers)-1 {
res += ", "
}
}

return "{" + res + "}"
}
Loading

0 comments on commit 547467b

Please sign in to comment.