Skip to content

Commit

Permalink
Improve and stabilize e2e tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Mar 19, 2020
1 parent 74763e4 commit 66245fd
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 81 deletions.
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Changed

- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2136](https://github.com/thanos-io/thanos/pull/2136) *breaking* store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. If you want to keep existing behavior, you should add `--delete-delay=0s` as a flag.
- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,12 @@ test-local:
.PHONY: test-e2e
test-e2e: ## Runs all Thanos e2e docker-based e2e tests from test/e2e. Required access to docker daemon.
test-e2e: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -v ./test/e2e/...
@go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/...

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
34 changes: 30 additions & 4 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@ type syncMetrics struct {
modified *extprom.TxGaugeVec
}

func (s *syncMetrics) submit() {
if s == nil {
return
}

if s.synced != nil {
s.synced.Submit()
}

if s.modified != nil {
s.modified.Submit()
}
}

func (s *syncMetrics) resetTx() {
if s == nil {
return
}

if s.synced != nil {
s.synced.ResetTx()
}

if s.modified != nil {
s.modified.ResetTx()
}
}

const (
syncMetricSubSys = "blocks_meta"

Expand Down Expand Up @@ -260,8 +288,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.
metaErrs tsdberrors.MultiError
)

s.metrics.synced.ResetTx()
s.metrics.modified.ResetTx()
s.metrics.resetTx()

for i := 0; i < s.concurrency; i++ {
wg.Add(1)
Expand Down Expand Up @@ -364,8 +391,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.
}

s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
s.metrics.synced.Submit()
s.metrics.modified.Submit()
s.metrics.submit()

if incompleteView {
return metas, partial, errors.Wrap(metaErrs, "incomplete view")
Expand Down
96 changes: 58 additions & 38 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
)

func TestCompact(t *testing.T) {
t.Parallel()
l := log.NewLogfmtLogger(os.Stdout)

// blockDesc describes a recipe to generate blocks from the given series and external labels.
Expand Down Expand Up @@ -109,25 +108,25 @@ func TestCompact(t *testing.T) {
expectOfChunks: 2,
},
{
name: "(full) vertically overlapping blocks with replica labels, downsampling disabled",
name: "(full) vertically overlapping blocks with replica labels downsampling disabled",
blocks: []blockDesc{
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "1"),
extLset: labels.FromStrings("ext1", "value1", "ext2", "value2", "replica", "1"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "2"),
extLset: labels.FromStrings("ext2", "value2", "ext1", "value1", "replica", "2"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1"),
extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1", "ext2", "value2"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
Expand All @@ -142,6 +141,7 @@ func TestCompact(t *testing.T) {
"a": "1",
"b": "2",
"ext1": "value1",
"ext2": "value2",
},
},
expectOfModBlocks: 3,
Expand All @@ -151,56 +151,78 @@ func TestCompact(t *testing.T) {
expectOfChunks: 2,
},
{
name: "(partial) vertically overlapping blocks with replica labels",
name: "(full) vertically overlapping blocks with replica labels, downsampling disabled and extra blocks",
blocks: []blockDesc{
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "1"),
extLset: labels.FromStrings("ext1", "value1", "ext2", "value2", "replica", "1"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "2"),
extLset: labels.FromStrings("ext2", "value2", "ext1", "value1", "replica", "2"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(1 * time.Hour)),
samplesPerSeries: 60,
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1", "ext2", "value2"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
{
series: []labels.Labels{labels.FromStrings("c", "1", "d", "2")},
extLset: labels.FromStrings("ext1", "value1", "ext2", "value2"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
{
series: []labels.Labels{labels.FromStrings("c", "1", "d", "2")},
extLset: labels.FromStrings("ext3", "value3"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
},
},
replicaLabels: []string{"replica"},
downsamplingEnabled: true,
replicaLabels: []string{"replica", "rule_replica"},
downsamplingEnabled: false,
query: "{a=\"1\"}",

expected: []model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
"ext2": "value2",
},
},
expectOfModBlocks: 2,
expectOfBlocks: 1,
expectOfSamples: 179, // TODO(kakkoyun): ?
expectOfSeries: 1,
expectOfChunks: 2,
expectOfModBlocks: 3,
expectOfBlocks: 2,
expectOfSamples: 360,
expectOfSeries: 3,
expectOfChunks: 6,
},
{
name: "(contains) vertically overlapping blocks with replica labels",
name: "(partial) vertically overlapping blocks with replica labels",
blocks: []blockDesc{
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "1"),
mint: timestamp.FromTime(now.Add(30 * time.Minute)),
maxt: timestamp.FromTime(now.Add(1 * time.Hour)),
samplesPerSeries: 90,
extLset: labels.FromStrings("ext1", "value1", "ext2", "value2", "replica", "1"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 119,
},
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "2"),
extLset: labels.FromStrings("ext2", "value2", "ext1", "value1", "replica", "2"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(2 * time.Hour)),
samplesPerSeries: 120,
maxt: timestamp.FromTime(now.Add(1 * time.Hour)),
samplesPerSeries: 59,
},
},
replicaLabels: []string{"replica"},
Expand All @@ -212,11 +234,12 @@ func TestCompact(t *testing.T) {
"a": "1",
"b": "2",
"ext1": "value1",
"ext2": "value2",
},
},
expectOfModBlocks: 2,
expectOfBlocks: 1,
expectOfSamples: 210, // TODO(kakkoyun): ?
expectOfSamples: 119,
expectOfSeries: 1,
expectOfChunks: 2,
},
Expand All @@ -228,14 +251,14 @@ func TestCompact(t *testing.T) {
extLset: labels.FromStrings("ext1", "value1", "replica", "1"),
mint: timestamp.FromTime(now.Add(30 * time.Minute)),
maxt: timestamp.FromTime(now.Add(150 * time.Minute)),
samplesPerSeries: 120,
samplesPerSeries: 119,
},
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
extLset: labels.FromStrings("ext1", "value1", "replica", "2"),
mint: timestamp.FromTime(now.Add(-30 * time.Minute)),
maxt: timestamp.FromTime(now.Add(90 * time.Minute)),
samplesPerSeries: 120,
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(120 * time.Minute)),
samplesPerSeries: 119,
},
},
replicaLabels: []string{"replica"},
Expand All @@ -251,12 +274,12 @@ func TestCompact(t *testing.T) {
},
expectOfModBlocks: 2,
expectOfBlocks: 1,
expectOfSamples: 240, // TODO(kakkoyun): ?
expectOfSamples: 149,
expectOfSeries: 1,
expectOfChunks: 2,
},
{
name: "(full) vertically overlapping blocks with replica labels, retention specified",
name: "(full) vertically overlapping blocks with replica labels retention specified",
blocks: []blockDesc{
{
series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")},
Expand Down Expand Up @@ -345,11 +368,9 @@ func TestCompact(t *testing.T) {
i := i
tcase := tcase
t.Run(tcase.name, func(t *testing.T) {
t.Parallel()

s, err := e2e.NewScenario("e2e_test_compact_" + strconv.Itoa(i))
testutil.Ok(t, err)
defer s.Close()
defer s.Close() // TODO(kakkoyun): Change with t.CleanUp after go 1.14 update.

dir := filepath.Join(s.SharedDir(), "tmp_"+strconv.Itoa(i))
testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), dir), os.ModePerm))
Expand All @@ -368,8 +389,8 @@ func TestCompact(t *testing.T) {
}, "test-feed")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel() // TODO(kakkoyun): Change with t.CleanUp after go 1.14 update.

var rawBlockIds []ulid.ULID
for _, b := range tcase.blocks {
Expand Down Expand Up @@ -405,7 +426,6 @@ func TestCompact(t *testing.T) {
tcase.downsamplingEnabled,
append(dedupFlags, retenFlags...)...,
)

testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(cmpt))
testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIds))), "thanos_blocks_meta_synced"))
Expand Down
Loading

0 comments on commit 66245fd

Please sign in to comment.