Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDB Store, fixed multitsdb benchmarks. #3046

Merged
merged 1 commit into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ We also have example Grafana dashboards [here](/examples/dashboards/dashboards.m
* [Monzo user story](https://monzo.com/blog/2018/07/27/how-we-monitor-monzo)
* [Banzai Cloud hand's on](https://banzaicloud.com/blog/hands-on-thanos/)
* [uSwitch user story](https://medium.com/uswitch-labs/making-prometheus-more-awesome-with-thanos-fbec8c6c28ad)
* [Thanos usage](https://www.infracloud.io/thanos-ha-scalable-prometheus/)
* [Thanos usage](https://www.infracloud.io/blogs/thanos-ha-scalable-prometheus/)

## Integrations

Expand Down
21 changes: 13 additions & 8 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
var (
logger = log.NewNopLogger()
blocks []*bucketBlock
series []storepb.Series
series []*storepb.Series
random = rand.New(rand.NewSource(120))
)

Expand Down Expand Up @@ -1210,7 +1210,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
// This allows to pick time range that will correspond to number of series picked 1:1.
for bi := 0; bi < numOfBlocks; bi++ {
head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)),
SamplesPerSeries: samplesPerSeriesPerBlock,
Series: seriesPerBlock,
PrependLabels: extLset,
Expand Down Expand Up @@ -1533,17 +1533,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
}

// Create TSDB blocks.
opts := storetestutil.HeadGenOptions{
Dir: tmpDir,
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "0"),
SamplesPerSeries: 1,
Series: 2,
PrependLabels: extLset,
Random: random,
}
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts)
})
block1 := createBlockFromHead(t, bktDir, head)
testutil.Ok(t, head.Close())
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts)
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "1"),
SamplesPerSeries: 1,
Series: 2,
PrependLabels: extLset,
Random: random,
})
block2 := createBlockFromHead(t, bktDir, head2)
testutil.Ok(t, head2.Close())

Expand Down Expand Up @@ -1607,7 +1612,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: []hintspb.SeriesResponseHints{
{
QueriedBlocks: []hintspb.Block{
Expand Down
27 changes: 26 additions & 1 deletion pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package store
import (
"context"
"fmt"
"io"
"sync"

"github.com/go-kit/kit/log"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -96,6 +99,8 @@ type tenantSeriesSetServer struct {

err error
tenant string

closers []io.Closer
}

// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
Expand Down Expand Up @@ -156,6 +161,18 @@ func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
}
}

func (s *tenantSeriesSetServer) Delegate(closer io.Closer) {
s.closers = append(s.closers, closer)
}

func (s *tenantSeriesSetServer) Close() error {
var merr tsdb_errors.MultiError
for _, c := range s.closers {
merr.Add(c.Close())
}
return merr.Err()
}

func (s *tenantSeriesSetServer) Next() (ok bool) {
s.cur, ok = <-s.recv
return ok
Expand Down Expand Up @@ -188,6 +205,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respCh := newCancelableRespChannel(gctx, 10)

var closers []io.Closer
g.Go(func() error {
// This go routine is responsible for calling store's Series concurrently. Merged results
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
Expand Down Expand Up @@ -216,6 +234,8 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
defer wg.Done()
ss.Series(store, r)
}()

closers = append(closers, ss)
seriesSet = append(seriesSet, ss)
}

Expand All @@ -237,7 +257,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
}
return nil
})
return g.Wait()
err := g.Wait()
for _, c := range closers {
runutil.CloseWithLogOnErr(s.logger, c, "close tenant series request")
}
return err

}

// LabelNames returns all known label names.
Expand Down
23 changes: 9 additions & 14 deletions pkg/store/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
}
}()
for j := range dbs {
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))

head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: tsdbDir,
SamplesPerSeries: samplesPerSeriesPerTSDB,
Series: seriesPerTSDB,
WithWAL: true,
WithWAL: !flushToBlocks,
Random: random,
SkipChunks: t.IsBenchmark(),
})
testutil.Ok(t, head.Close())

tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))

for i := 0; i < len(created); i++ {
resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i]))
resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i]))
}

if flushToBlocks {
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
testutil.Ok(t, err)

testutil.Ok(t, db.FlushWAL(tmpDir))
testutil.Ok(t, db.Close())
_ = createBlockFromHead(t, tsdbDir, head)
}
testutil.Ok(t, head.Close())

db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
testutil.Ok(t, err)
Expand All @@ -128,7 +123,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB

store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })

var expected []storepb.Series
var expected []*storepb.Series
lastLabels := storepb.Series{}
for _, resp := range resps {
for _, r := range resp {
Expand All @@ -140,7 +135,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
continue
}
lastLabels = x
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}
}

Expand Down
15 changes: 7 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -1616,17 +1617,16 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
var resps []*storepb.SeriesResponse

head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)),
SamplesPerSeries: samplesPerSeriesPerClient,
Series: seriesPerClient,
MaxFrameBytes: storetestutil.RemoteReadFrameLimit,
Random: random,
SkipChunks: t.IsBenchmark(),
})
testutil.Ok(t, head.Close())

for i := 0; i < len(created); i++ {
resps = append(resps, storepb.NewSeriesResponse(&created[i]))
resps = append(resps, storepb.NewSeriesResponse(created[i]))
}

clients[j] = &testClient{
Expand All @@ -1647,23 +1647,22 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
}

var allResps []*storepb.SeriesResponse
var expected []storepb.Series
var expected []*storepb.Series
lastLabels := storepb.Series{}
for _, c := range clients {
m := c.(*testClient).StoreClient.(*mockedStoreAPI)

// NOTE: Proxy will merge all series with same labels without any frame limit (https://github.com/thanos-io/thanos/issues/2332).
for _, r := range m.RespSeries {
allResps = append(allResps, r)

// Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332).
// Let's do this here as well.
x := storepb.Series{Labels: r.GetSeries().Labels}
if x.String() == lastLabels.String() {
expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...)
continue
}
lastLabels = x
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}

}
Expand Down Expand Up @@ -1700,7 +1699,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
// In this we expect exactly the same response as input.
expected = expected[:0]
for _, r := range allResps {
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}
storetestutil.TestServerSeries(t, store,
&storetestutil.SeriesCase{
Expand Down
Loading