Skip to content

Commit

Permalink
receive/rule: Fixed Segfault issue; Added tests & benchmarks for TSDB…
Browse files Browse the repository at this point in the history
…Store, fixed multitsdb benchmarks.

Fixed #3013

Also:
* Fixed other quite big issue with reusing chunk slice.
* Fixed framing - previously it was wrongly sending single-chunk frames, taking
huge amount of time.

Fix: We deletage closer now to ensure multitsdb operate on valid data.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

# Conflicts:
#	pkg/store/tsdb_test.go
#	pkg/testutil/testutil.go
  • Loading branch information
bwplotka committed Aug 25, 2020
1 parent f32877f commit c513dab
Show file tree
Hide file tree
Showing 9 changed files with 578 additions and 91 deletions.
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ We also have example Grafana dashboards [here](/examples/dashboards/dashboards.m
* [Monzo user story](https://monzo.com/blog/2018/07/27/how-we-monitor-monzo)
* [Banzai Cloud hand's on](https://banzaicloud.com/blog/hands-on-thanos/)
* [uSwitch user story](https://medium.com/uswitch-labs/making-prometheus-more-awesome-with-thanos-fbec8c6c28ad)
* [Thanos usage](https://www.infracloud.io/thanos-ha-scalable-prometheus/)
* [Thanos usage](https://www.infracloud.io/blogs/thanos-ha-scalable-prometheus/)

## Integrations

Expand Down
21 changes: 13 additions & 8 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
var (
logger = log.NewNopLogger()
blocks []*bucketBlock
series []storepb.Series
series []*storepb.Series
random = rand.New(rand.NewSource(120))
)

Expand Down Expand Up @@ -1210,7 +1210,7 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
// This allows to pick time range that will correspond to number of series picked 1:1.
for bi := 0; bi < numOfBlocks; bi++ {
head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", bi)),
SamplesPerSeries: samplesPerSeriesPerBlock,
Series: seriesPerBlock,
PrependLabels: extLset,
Expand Down Expand Up @@ -1533,17 +1533,22 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
}

// Create TSDB blocks.
opts := storetestutil.HeadGenOptions{
Dir: tmpDir,
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "0"),
SamplesPerSeries: 1,
Series: 2,
PrependLabels: extLset,
Random: random,
}
head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts)
})
block1 := createBlockFromHead(t, bktDir, head)
testutil.Ok(t, head.Close())
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts)
head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "1"),
SamplesPerSeries: 1,
Series: 2,
PrependLabels: extLset,
Random: random,
})
block2 := createBlockFromHead(t, bktDir, head2)
testutil.Ok(t, head2.Close())

Expand Down Expand Up @@ -1607,7 +1612,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: []hintspb.SeriesResponseHints{
{
QueriedBlocks: []hintspb.Block{
Expand Down
27 changes: 26 additions & 1 deletion pkg/store/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package store
import (
"context"
"fmt"
"io"
"sync"

"github.com/go-kit/kit/log"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/runutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -96,6 +99,8 @@ type tenantSeriesSetServer struct {

err error
tenant string

closers []io.Closer
}

// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
Expand Down Expand Up @@ -156,6 +161,18 @@ func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
}
}

func (s *tenantSeriesSetServer) Delegate(closer io.Closer) {
s.closers = append(s.closers, closer)
}

func (s *tenantSeriesSetServer) Close() error {
var merr tsdb_errors.MultiError
for _, c := range s.closers {
merr.Add(c.Close())
}
return merr.Err()
}

func (s *tenantSeriesSetServer) Next() (ok bool) {
s.cur, ok = <-s.recv
return ok
Expand Down Expand Up @@ -188,6 +205,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respCh := newCancelableRespChannel(gctx, 10)

var closers []io.Closer
g.Go(func() error {
// This go routine is responsible for calling store's Series concurrently. Merged results
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
Expand Down Expand Up @@ -216,6 +234,8 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
defer wg.Done()
ss.Series(store, r)
}()

closers = append(closers, ss)
seriesSet = append(seriesSet, ss)
}

Expand All @@ -237,7 +257,12 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
}
return nil
})
return g.Wait()
err := g.Wait()
for _, c := range closers {
runutil.CloseWithLogOnErr(s.logger, c, "close tenant series request")
}
return err

}

// LabelNames returns all known label names.
Expand Down
23 changes: 9 additions & 14 deletions pkg/store/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,24 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
}
}()
for j := range dbs {
tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))

head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: tsdbDir,
SamplesPerSeries: samplesPerSeriesPerTSDB,
Series: seriesPerTSDB,
WithWAL: true,
WithWAL: !flushToBlocks,
Random: random,
SkipChunks: t.IsBenchmark(),
})
testutil.Ok(t, head.Close())

tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j))

for i := 0; i < len(created); i++ {
resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i]))
resps[j] = append(resps[j], storepb.NewSeriesResponse(created[i]))
}

if flushToBlocks {
db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
testutil.Ok(t, err)

testutil.Ok(t, db.FlushWAL(tmpDir))
testutil.Ok(t, db.Close())
_ = createBlockFromHead(t, tsdbDir, head)
}
testutil.Ok(t, head.Close())

db, err := tsdb.OpenDBReadOnly(tsdbDir, logger)
testutil.Ok(t, err)
Expand All @@ -128,7 +123,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB

store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })

var expected []storepb.Series
var expected []*storepb.Series
lastLabels := storepb.Series{}
for _, resp := range resps {
for _, r := range resp {
Expand All @@ -140,7 +135,7 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
continue
}
lastLabels = x
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}
}

Expand Down
15 changes: 7 additions & 8 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -1616,17 +1617,16 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
var resps []*storepb.SeriesResponse

head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{
Dir: tmpDir,
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", j)),
SamplesPerSeries: samplesPerSeriesPerClient,
Series: seriesPerClient,
MaxFrameBytes: storetestutil.RemoteReadFrameLimit,
Random: random,
SkipChunks: t.IsBenchmark(),
})
testutil.Ok(t, head.Close())

for i := 0; i < len(created); i++ {
resps = append(resps, storepb.NewSeriesResponse(&created[i]))
resps = append(resps, storepb.NewSeriesResponse(created[i]))
}

clients[j] = &testClient{
Expand All @@ -1647,23 +1647,22 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
}

var allResps []*storepb.SeriesResponse
var expected []storepb.Series
var expected []*storepb.Series
lastLabels := storepb.Series{}
for _, c := range clients {
m := c.(*testClient).StoreClient.(*mockedStoreAPI)

// NOTE: Proxy will merge all series with same labels without any frame limit (https://github.com/thanos-io/thanos/issues/2332).
for _, r := range m.RespSeries {
allResps = append(allResps, r)

// Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332).
// Let's do this here as well.
x := storepb.Series{Labels: r.GetSeries().Labels}
if x.String() == lastLabels.String() {
expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...)
continue
}
lastLabels = x
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}

}
Expand Down Expand Up @@ -1700,7 +1699,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
// In this we expect exactly the same response as input.
expected = expected[:0]
for _, r := range allResps {
expected = append(expected, *r.GetSeries())
expected = append(expected, r.GetSeries())
}
storetestutil.TestServerSeries(t, store,
&storetestutil.SeriesCase{
Expand Down
Loading

0 comments on commit c513dab

Please sign in to comment.