Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e: Fixing compactor test flakiness. #2313

Merged
merged 1 commit into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run e2e docker-based tests.
run: make test-e2e-ci
run: make test-e2e
15 changes: 3 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,9 @@ test-e2e: docker
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -failfast -timeout 5m -v ./test/e2e/...

.PHONY: test-e2e-ci
test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon.
test-e2e-ci: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go clean -testcache
@go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/...
# NOTE(bwplotka):
# * If you see errors on CI (timeouts), but not locally, try to add -parallel 1 to limit to single CPU to reproduce small 1CPU machine.
@go test -failfast -timeout 10m -v ./test/e2e/...

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func runCompact(

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
return errors.Wrap(err, "compaction")
}

if !disableDownsampling {
Expand Down
10 changes: 5 additions & 5 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,17 +585,17 @@ func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaL
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, view bool) error {
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, _ bool) error {
for u, meta := range metas {
labels := meta.Thanos.Labels
l := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
if _, exists := labels[replicaLabel]; exists {
if _, exists := l[replicaLabel]; exists {
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel)
delete(labels, replicaLabel)
delete(l, replicaLabel)
modified.WithLabelValues(replicaRemovedMeta).Inc()
}
}
metas[u].Thanos.Labels = labels
metas[u].Thanos.Labels = l
}
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@ func (s *Syncer) Groups() (res []*Group, err error) {
groupKey := GroupKey(m.Thanos)
g, ok := groups[groupKey]
if !ok {
lbls := labels.FromMap(m.Thanos.Labels)
g, err = newGroup(
log.With(s.logger, "compactionGroup", groupKey),
log.With(s.logger, "compactionGroup", fmt.Sprintf("%d@%v", m.Thanos.Downsample.Resolution, lbls.String()), "compactionGroupKey", groupKey),
s.bkt,
labels.FromMap(m.Thanos.Labels),
lbls,
m.Thanos.Downsample.Resolution,
s.acceptMalformedIndex,
s.enableVerticalCompaction,
Expand Down Expand Up @@ -435,7 +436,7 @@ func (e HaltError) Error() string {
// IsHaltError returns true if the base error is a HaltError.
// If a multierror is passed, any halt error will return true.
func IsHaltError(err error) bool {
if multiErr, ok := err.(terrors.MultiError); ok {
if multiErr, ok := errors.Cause(err).(terrors.MultiError); ok {
for _, err := range multiErr {
if _, ok := errors.Cause(err).(HaltError); ok {
return true
Expand Down Expand Up @@ -581,6 +582,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
// Check for overlapped blocks.
overlappingBlocks := false
if err := cg.areBlocksOverlapping(nil); err != nil {
// TODO(bwplotka): It would really nice if we could still check for other overlaps than replica. In fact this should be checked
// in syncer itself. Otherwise with vertical compaction enabled we will sacrifice this important check.
if !cg.enableVerticalCompaction {
return false, ulid.ULID{}, halt(errors.Wrap(err, "pre compaction overlap check"))
}
Expand Down Expand Up @@ -853,7 +856,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
continue
}
}
errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key()))
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestHaltMultiError(t *testing.T) {

errs.Add(haltErr)
testutil.Assert(t, IsHaltError(errs), "if any halt errors are present this should return true")
testutil.Assert(t, IsHaltError(errors.Wrap(errs, "wrap")), "halt error with wrap")

}

func TestRetryMultiError(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string,
}
defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "query body")

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "read query instant response")
}

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
var m struct {
Expand All @@ -363,11 +368,6 @@ func (c *Client) QueryInstant(ctx context.Context, base *url.URL, query string,
Warnings []string `json:"warnings"`
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "read query instant response")
}

if err = json.Unmarshal(body, &m); err != nil {
return nil, nil, errors.Wrap(err, "unmarshal query instant response")
}
Expand Down
112 changes: 42 additions & 70 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package e2eutil

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -333,81 +335,22 @@ func CreateEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels,
return uid, nil
}

// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt)
// Block ID will be created with a delay of time duration blockDelay.
func CreateBlockWithBlockDelay(
// CreateBlock writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateBlock(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
blockDelay time.Duration,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
if err != nil {
return id, errors.Wrap(err, "block creation")
}

id, err = ulid.New(uint64(time.Unix(int64(blockID.Time()), 0).Add(-blockDelay*1000).Unix()), nil)
if err != nil {
return id, errors.Wrap(err, "create block id")
}

if blockID.Compare(id) == 0 {
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some linter for this

}

metaFile := path.Join(dir, blockID.String(), "meta.json")
r, err := os.Open(metaFile)
if err != nil {
return id, errors.Wrap(err, "open meta file")
}

metaContent, err := ioutil.ReadAll(r)
if err != nil {
return id, errors.Wrap(err, "read meta file")
}

m := &metadata.Meta{}
if err := json.Unmarshal(metaContent, m); err != nil {
return id, errors.Wrap(err, "meta.json corrupted")
}
m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}

if err := os.MkdirAll(path.Join(dir, id.String()), 0777); err != nil {
return id, errors.Wrap(err, "create directory")
}

err = copyRecursive(path.Join(dir, blockID.String()), path.Join(dir, id.String()))
if err != nil {
return id, errors.Wrap(err, "copy directory")
}

err = os.RemoveAll(path.Join(dir, blockID.String()))
if err != nil {
return id, errors.Wrap(err, "delete directory")
}

jsonMeta, err := json.MarshalIndent(m, "", "\t")
if err != nil {
return id, errors.Wrap(err, "meta marshal")
}

err = ioutil.WriteFile(path.Join(dir, id.String(), "meta.json"), jsonMeta, 0644)
if err != nil {
return id, errors.Wrap(err, "write meta.json file")
}

return
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
}

// CreateBlock writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt).
func CreateBlock(
// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block.
func CreateBlockWithTombstone(
ctx context.Context,
dir string,
series []labels.Labels,
Expand All @@ -416,20 +359,45 @@ func CreateBlock(
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true)
}

// CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block.
func CreateBlockWithTombstone(
// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each.
// Samples will be in the time range [mint, maxt)
// Block ID will be created with a delay of time duration blockDelay.
func CreateBlockWithBlockDelay(
ctx context.Context,
dir string,
series []labels.Labels,
numSamples int,
mint, maxt int64,
blockDelay time.Duration,
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true)
) (ulid.ULID, error) {
blockID, err := createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "block creation")
}

id, err := ulid.New(uint64(timestamp.FromTime(timestamp.Time(int64(blockID.Time())).Add(-blockDelay))), bytes.NewReader(blockID.Entropy()))
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create block id")
}

m, err := metadata.Read(path.Join(dir, blockID.String()))
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "open meta file")
}

m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}

if err := metadata.Write(log.NewNopLogger(), path.Join(dir, blockID.String()), m); err != nil {
return ulid.ULID{}, errors.Wrap(err, "write meta.json file")
}

return id, os.Rename(path.Join(dir, blockID.String()), path.Join(dir, id.String()))
}

func createBlock(
Expand Down Expand Up @@ -497,6 +465,10 @@ func createBlock(
return id, errors.Wrap(err, "write block")
}

if id.Compare(ulid.ULID{}) == 0 {
return id, errors.Errorf("nothing to write, asked for %d samples", numSamples)
}

if _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: resolution},
Expand Down
Loading