Skip to content

Commit

Permalink
e2e tests: Move to efficientgo/e2e (#4610)
Browse files Browse the repository at this point in the history
* Adjust e2ethanos pkg

- Adjusts services to use e2e
- Simplifies some func signatures, remove redundant funcs etc.
- Adds service for reverse proxy (instead of using one on host)
- Adds service for Minio (a workaround, see issue in the func comment)

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Adjust e2e tests

- Mechanical replacement / adjustments to new services, package imports etc.

Signed-off-by: Matej Gera <matejgera@gmail.com>

* go mod update; adjust gitignore

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Fix condition in Query FE

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Simplify propagation of container hostnames

- Remove manually building hostname with netName
- Use InternalEndpoint instead
- Take advantage of future runnable to get hostname addresses for configs

Signed-off-by: Matej Gera <matejgera@gmail.com>
  • Loading branch information
matej-g committed Sep 28, 2021
1 parent fc8e64c commit 360b39e
Show file tree
Hide file tree
Showing 17 changed files with 995 additions and 866 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ kube/.minikube

# Ignore e2e working dirs.
data/
test/e2e/e2e_integration_test*
test/e2e/e2e_*

# Ignore promu artifacts.
/.build
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/cortexproject/cortex v1.10.1-0.20210820081236-70dddb6b70b8
github.com/davecgh/go-spew v1.1.1
github.com/efficientgo/e2e v0.11.1-0.20210829161758-f4cc6dbdc6ea
github.com/efficientgo/tools/core v0.0.0-20210129205121-421d0828c9a6
github.com/efficientgo/tools/extkingpin v0.0.0-20210609125236-d73259166f20
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fatih/structtag v1.1.0
Expand All @@ -33,7 +34,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/snappy v0.0.4
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/grafana/dskit v0.0.0-20210819132858-471020752967
github.com/grafana/dskit v0.0.0-20210819132858-471020752967 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down
16 changes: 8 additions & 8 deletions pkg/objstore/s3/s3_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"strings"
"testing"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/go-kit/kit/log"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
Expand All @@ -23,19 +23,19 @@ func BenchmarkUpload(b *testing.B) {
b.ReportAllocs()
ctx := context.Background()

s, err := e2e.NewScenario("e2e_bench_mino_client")
e, err := e2e.NewDockerEnvironment("e2e_bench_mino_client")
testutil.Ok(b, err)
b.Cleanup(e2ethanos.CleanScenario(b, s))
b.Cleanup(e2ethanos.CleanScenario(b, e))

const bucket = "test"
m := e2edb.NewMinio(8080, bucket)
testutil.Ok(b, s.StartAndWaitReady(m))
const bucket = "benchmark"
m := e2ethanos.NewMinio(e, "benchmark", bucket)
testutil.Ok(b, e2e.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.HTTPEndpoint(),
Endpoint: m.Endpoint("http"),
Insecure: true,
}, "test-feed")
testutil.Ok(b, err)
Expand Down
73 changes: 39 additions & 34 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/efficientgo/e2e/matchers"
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
Expand Down Expand Up @@ -336,22 +338,22 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
if penaltyDedup {
name = "e2e_test_compact_penalty_dedup"
}
s, err := e2e.NewScenario(name)
e, err := e2e.NewDockerEnvironment(name)
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, s))
t.Cleanup(e2ethanos.CleanScenario(t, e))

dir := filepath.Join(s.SharedDir(), "tmp")
dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))

const bucket = "compact_test"
m := e2edb.NewMinio(8080, bucket)
testutil.Ok(t, s.StartAndWaitReady(m))
m := e2ethanos.NewMinio(e, "minio", bucket)
testutil.Ok(t, e2e.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(logger, s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.HTTPEndpoint(), // We need separate client config, when connecting to minio from outside.
Endpoint: m.Endpoint("http"), // We need separate client config, when connecting to minio from outside.
Insecure: true,
}, "test-feed")
testutil.Ok(t, err)
Expand All @@ -363,7 +365,10 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
for _, b := range blocks {
id, err := b.Create(ctx, dir, justAfterConsistencyDelay, b.hashFunc)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String()))
testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error {
return objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())
}))

rawBlockIDs[id] = struct{}{}
if b.markedForNoCompact {
testutil.Ok(t, block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))
Expand Down Expand Up @@ -442,26 +447,26 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.NetworkHTTPEndpoint(),
Endpoint: m.InternalEndpoint("http"),
Insecure: true,
},
}
str, err := e2ethanos.NewStoreGW(s.SharedDir(), "1", svcConfig)
str, err := e2ethanos.NewStoreGW(e, "1", svcConfig)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(str))
testutil.Ok(t, e2e.StartAndWaitReady(str))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", str.GRPCNetworkEndpoint()).Build()
q, err := e2ethanos.NewQuerierBuilder(e, "1", str.InternalEndpoint("grpc")).Build()
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(q))
testutil.Ok(t, e2e.StartAndWaitReady(q))

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

// Check if query detects current series, even if overlapped.
queryAndAssert(t, ctx, q.HTTPEndpoint(),
queryAndAssert(t, ctx, q.Endpoint("http"),
fmt.Sprintf(`count_over_time({a="1"}[13h] offset %ds)`, int64(time.Since(now.Add(12*time.Hour)).Seconds())),
promclient.QueryOptions{
Deduplicate: false, // This should be false, so that we can be sure deduplication was offline.
Expand Down Expand Up @@ -599,7 +604,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
// Precreate a directory. It should be deleted.
// In a hypothetical scenario, the directory could be a left-over from
// a compaction that had crashed.
p := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt", "compact")
p := filepath.Join(e.SharedDir(), "data", "compact", "expect-to-halt", "compact")

testutil.Assert(t, len(blocksWithHashes) > 0)

Expand All @@ -613,9 +618,9 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
testutil.Ok(t, err)
testutil.Ok(t, f.Close())

c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
c, err := e2ethanos.NewCompactor(e, "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, e2e.StartAndWaitReady(c))

// Expect compactor halted and for one cleanup iteration to happen.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_halted"))
Expand All @@ -626,10 +631,10 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// The compact directory is still there.
dataDir := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt")
dataDir := filepath.Join(e.SharedDir(), "data", "compact", "expect-to-halt")
empty, err := isEmptyDir(dataDir)
testutil.Ok(t, err)
testutil.Equals(t, false, empty, "directory %s should not be empty", dataDir)
testutil.Equals(t, false, empty, "directory %e should not be empty", dataDir)

// We expect no ops.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_iterations_total"))
Expand All @@ -646,10 +651,10 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_blocks_cleaned_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.Endpoint("http"), "global"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.Endpoint("http"), "loaded"))

testutil.Ok(t, s.Stop(c))
testutil.Ok(t, c.Stop())

_, err = os.Stat(randBlockDir)
testutil.NotOk(t, err)
Expand All @@ -661,7 +666,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
// Dedup enabled; compactor should work as expected.
{
// Predownload block dirs with hashes. We should not try downloading them again.
p := filepath.Join(s.SharedDir(), "data", "compact", "working")
p := filepath.Join(e.SharedDir(), "data", "compact", "working")

for _, id := range blocksWithHashes {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
Expand All @@ -677,9 +682,9 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
}

// We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction.
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...)
c, err := e2ethanos.NewCompactor(e, "working", svcConfig, nil, extArgs...)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, e2e.StartAndWaitReady(c))

// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
Expand All @@ -706,9 +711,9 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_halted"))

bucketMatcher, err := labels.NewMatcher(labels.MatchEqual, "bucket", bucket)
bucketMatcher, err := matchers.NewMatcher(matchers.MatchEqual, "bucket", bucket)
testutil.Ok(t, err)
operationMatcher, err := labels.NewMatcher(labels.MatchEqual, "operation", "get")
operationMatcher, err := matchers.NewMatcher(matchers.MatchEqual, "operation", "get")
testutil.Ok(t, err)
testutil.Ok(t, c.WaitSumMetricsWithOptions(e2e.Equals(478),
[]string{"thanos_objstore_bucket_operations_total"}, e2e.WithLabelMatchers(
Expand All @@ -718,13 +723,13 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
)

// Make sure compactor does not modify anything else over time.
testutil.Ok(t, s.Stop(c))
testutil.Ok(t, c.Stop())

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

// Check if query detects new blocks.
queryAndAssert(t, ctx, q.HTTPEndpoint(),
queryAndAssert(t, ctx, q.Endpoint("http"),
fmt.Sprintf(`count_over_time({a="1"}[13h] offset %ds)`, int64(time.Since(now.Add(12*time.Hour)).Seconds())),
promclient.QueryOptions{
Deduplicate: false, // This should be false, so that we can be sure deduplication was offline.
Expand All @@ -742,9 +747,9 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {
if penaltyDedup {
extArgs = append(extArgs, "--deduplication.func=penalty")
}
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, extArgs...)
c, err := e2ethanos.NewCompactor(e, "working-dedup", svcConfig, nil, extArgs...)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, e2e.StartAndWaitReady(c))

// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
Expand All @@ -767,13 +772,13 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) {

testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_halted"))
// Make sure compactor does not modify anything else over time.
testutil.Ok(t, s.Stop(c))
testutil.Ok(t, c.Stop())

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

// Check if query detects new blocks.
queryAndAssert(t, ctx, q.HTTPEndpoint(),
queryAndAssert(t, ctx, q.Endpoint("http"),
fmt.Sprintf(`count_over_time({a="1"}[13h] offset %ds)`, int64(time.Since(now.Add(12*time.Hour)).Seconds())),
promclient.QueryOptions{
Deduplicate: false, // This should be false, so that we can be sure deduplication was offline.
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/e2ethanos/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import (
"strings"
"testing"

"github.com/cortexproject/cortex/integration/e2e"
"github.com/efficientgo/e2e"

"github.com/thanos-io/thanos/pkg/testutil"
)

func CleanScenario(t testing.TB, s *e2e.Scenario) func() {
func CleanScenario(t testing.TB, e *e2e.DockerEnvironment) func() {
return func() {
// Make sure Clean can properly delete everything.
testutil.Ok(t, exec.Command("chmod", "-R", "777", s.SharedDir()).Run())
s.Close()
testutil.Ok(t, exec.Command("chmod", "-R", "777", e.SharedDir()).Run())
e.Close()
}
}

Expand Down
73 changes: 56 additions & 17 deletions test/e2e/e2ethanos/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,74 @@
package e2ethanos

import (
"github.com/cortexproject/cortex/integration/e2e"
)
"os"
"strconv"

type Service struct {
*e2e.HTTPService
"github.com/efficientgo/e2e"
)

grpc int
type Port struct {
Name string
PortNum int
IsMetrics bool
}

func NewService(
e e2e.Environment,
name string,
image string,
command *e2e.Command,
command e2e.Command,
readiness *e2e.HTTPReadinessProbe,
http, grpc int,
otherPorts ...int,
) *Service {
return &Service{
HTTPService: e2e.NewHTTPService(name, image, command, readiness, http, append(otherPorts, grpc)...),
grpc: grpc,
}
otherPorts ...Port,
) *e2e.InstrumentedRunnable {
return newUninitiatedService(e, name, http, grpc, otherPorts...).Init(
e2e.StartOptions{
Image: image,
Command: command,
Readiness: readiness,
User: strconv.Itoa(os.Getuid()),
WaitReadyBackoff: &defaultBackoffConfig,
},
)
}

func (s *Service) GRPCEndpoint() string { return s.Endpoint(s.grpc) }
func newUninitiatedService(
e e2e.Environment,
name string,
http, grpc int,
otherPorts ...Port,
) *e2e.FutureInstrumentedRunnable {
metricsPorts := "http"
ports := map[string]int{
"http": http,
"grpc": grpc,
}

func (s *Service) GRPCNetworkEndpoint() string {
return s.NetworkEndpoint(s.grpc)
for _, op := range otherPorts {
ports[op.Name] = op.PortNum

if op.IsMetrics {
metricsPorts = op.Name
}
}

return e2e.NewInstrumentedRunnable(e, name, ports, metricsPorts)
}

func (s *Service) GRPCNetworkEndpointFor(networkName string) string {
return s.NetworkEndpointFor(networkName, s.grpc)
func initiateService(
service *e2e.FutureInstrumentedRunnable,
image string,
command e2e.Command,
readiness *e2e.HTTPReadinessProbe,
) *e2e.InstrumentedRunnable {
return service.Init(
e2e.StartOptions{
Image: image,
Command: command,
Readiness: readiness,
User: strconv.Itoa(os.Getuid()),
WaitReadyBackoff: &defaultBackoffConfig,
},
)
}
Loading

0 comments on commit 360b39e

Please sign in to comment.