From 65b23e2b4236a5a26f6eaa1a21dc0e5c4e0c38d8 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 22 Jan 2018 22:52:35 +0100 Subject: [PATCH 01/15] reorganize chaos dir --- chaos/{ => docker}/docker.go | 17 ++-- chaos/{ => docker}/path.go | 4 +- .../{ingest.go => fakemetrics/fakemetrics.go} | 7 +- .../out/kafkamdm/kafkamdm.go | 2 +- chaos/{ => fakemetrics}/out/out.go | 0 chaos/{ => fakemetrics}/out/saramahelper.go | 0 chaos/{ => grafana}/grafana.go | 4 +- chaos/{ => graphite}/graphite.go | 42 +++++----- .../response.go} | 4 +- .../validate.go} | 34 ++++---- .../chaos_cluster/chaos_cluster_test.go} | 79 ++++++++++--------- chaos/{ => track}/tracker.go | 2 +- 12 files changed, 100 insertions(+), 95 deletions(-) rename chaos/{ => docker}/docker.go (85%) rename chaos/{ => docker}/path.go (91%) rename chaos/{ingest.go => fakemetrics/fakemetrics.go} (92%) rename chaos/{ => fakemetrics}/out/kafkamdm/kafkamdm.go (98%) rename chaos/{ => fakemetrics}/out/out.go (100%) rename chaos/{ => fakemetrics}/out/saramahelper.go (100%) rename chaos/{ => grafana}/grafana.go (94%) rename chaos/{ => graphite}/graphite.go (82%) rename chaos/{graphite_response.go => graphite/response.go} (97%) rename chaos/{graphite_validate.go => graphite/validate.go} (81%) rename chaos/{chaos_test.go => tests/chaos_cluster/chaos_cluster_test.go} (65%) rename chaos/{ => track}/tracker.go (99%) diff --git a/chaos/docker.go b/chaos/docker/docker.go similarity index 85% rename from chaos/docker.go rename to chaos/docker/docker.go index 97c161d8d9..fcc7a7d5a7 100644 --- a/chaos/docker.go +++ b/chaos/docker/docker.go @@ -1,4 +1,4 @@ -package chaos +package docker import ( "context" @@ -7,6 +7,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/client" + "github.com/grafana/metrictank/chaos/track" ) var cli *client.Client @@ -61,18 +62,18 @@ func stop(name string) error { } */ -// isolate isolates traffic between containers in setA and containers in setB -func isolate(setA, setB []string, dur string) error { +// Isolate isolates traffic between containers in setA and containers in setB +func Isolate(setA, setB []string, dur string) error { // note: isolateOut should return very fast (order of ms) // so we can run all this in serial for _, a := range setA { - err := isolateOut(a, dur, setB...) + err := IsolateOut(a, dur, setB...) if err != nil { return err } } for _, b := range setB { - err := isolateOut(b, dur, setA...) + err := IsolateOut(b, dur, setA...) if err != nil { return err } @@ -80,8 +81,8 @@ func isolate(setA, setB []string, dur string) error { return nil } -// isolateOut isolates traffic from the given docker container to all others matching the expression -func isolateOut(name, dur string, targets ...string) error { +// IsolateOut isolates traffic from the given docker container to all others matching the expression +func IsolateOut(name, dur string, targets ...string) error { containers, err := cli.ContainerList(context.Background(), types.ContainerListOptions{}) if err != nil { return err @@ -108,7 +109,7 @@ func isolateOut(name, dur string, targets ...string) error { } // log all pumba's output - _, err = NewTracker(cmd, false, false, "pumba-stdout", "pumba-stderr") + _, err = track.NewTracker(cmd, false, false, "pumba-stdout", "pumba-stderr") if err != nil { return err } diff --git a/chaos/path.go b/chaos/docker/path.go similarity index 91% rename from chaos/path.go rename to chaos/docker/path.go index 4ea9ac5da6..67d89fd095 100644 --- a/chaos/path.go +++ b/chaos/docker/path.go @@ -1,4 +1,4 @@ -package chaos +package docker import ( "os" @@ -10,7 +10,7 @@ import ( // path takes a relative path within the metrictank repository and returns the full absolute filepath, // assuming metrictank repo is in the first directory specified in GOPATH -func path(dst string) string { +func Path(dst string) string { gopath := os.Getenv("GOPATH") if gopath == "" { var err error diff --git a/chaos/ingest.go b/chaos/fakemetrics/fakemetrics.go similarity index 92% rename from chaos/ingest.go rename to chaos/fakemetrics/fakemetrics.go index 10e9f329b4..eb3245b5a4 100644 --- a/chaos/ingest.go +++ b/chaos/fakemetrics/fakemetrics.go @@ -1,11 +1,11 @@ -package chaos +package fakemetrics import ( "fmt" "log" "time" - "github.com/grafana/metrictank/chaos/out/kafkamdm" + "github.com/grafana/metrictank/chaos/fakemetrics/out/kafkamdm" "github.com/grafana/metrictank/clock" "github.com/raintank/met/helper" "gopkg.in/raintank/schema.v1" @@ -31,13 +31,12 @@ func init() { } } -func fakeMetrics() { +func Kafka() { stats, _ := helper.New(false, "", "standard", "", "") out, err := kafkamdm.New("mdm", []string{"localhost:9092"}, "none", stats, "lastNum") if err != nil { log.Fatal(4, "failed to create kafka-mdm output. %s", err) } - // advantage over regular ticker: // 1) no ticks dropped // 2) ticks come asap after the start of a new second, so we can measure better how long it took to get the data diff --git a/chaos/out/kafkamdm/kafkamdm.go b/chaos/fakemetrics/out/kafkamdm/kafkamdm.go similarity index 98% rename from chaos/out/kafkamdm/kafkamdm.go rename to chaos/fakemetrics/out/kafkamdm/kafkamdm.go index 2669917b31..1de9680cf5 100644 --- a/chaos/out/kafkamdm/kafkamdm.go +++ b/chaos/fakemetrics/out/kafkamdm/kafkamdm.go @@ -9,7 +9,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/grafana/metrictank/chaos/out" + "github.com/grafana/metrictank/chaos/fakemetrics/out" p "github.com/grafana/metrictank/cluster/partitioner" "github.com/raintank/met" "github.com/raintank/worldping-api/pkg/log" diff --git a/chaos/out/out.go b/chaos/fakemetrics/out/out.go similarity index 100% rename from chaos/out/out.go rename to chaos/fakemetrics/out/out.go diff --git a/chaos/out/saramahelper.go b/chaos/fakemetrics/out/saramahelper.go similarity index 100% rename from chaos/out/saramahelper.go rename to chaos/fakemetrics/out/saramahelper.go diff --git a/chaos/grafana.go b/chaos/grafana/grafana.go similarity index 94% rename from chaos/grafana.go rename to chaos/grafana/grafana.go index d95e8728c2..786ab2d3b5 100644 --- a/chaos/grafana.go +++ b/chaos/grafana/grafana.go @@ -1,4 +1,4 @@ -package chaos +package grafana import ( "bytes" @@ -22,7 +22,7 @@ type annotation struct { Text string } -func postAnnotation(msg string) { +func PostAnnotation(msg string) { go func() { a := annotation{ Time: int(time.Now().UnixNano() / 1000 / 1000), diff --git a/chaos/graphite.go b/chaos/graphite/graphite.go similarity index 82% rename from chaos/graphite.go rename to chaos/graphite/graphite.go index 0248f2858a..5590414381 100644 --- a/chaos/graphite.go +++ b/chaos/graphite/graphite.go @@ -1,4 +1,4 @@ -package chaos +package graphite import ( "encoding/json" @@ -20,8 +20,8 @@ func init() { } } -func renderQuery(base, target, from string) response { - var r response +func renderQuery(base, target, from string) Response { + var r Response url := fmt.Sprintf("%s/render?target=%s&format=json&from=%s", base, target, from) req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -44,14 +44,14 @@ func renderQuery(base, target, from string) response { return r } -func retryGraphite(query, from string, times int, validate Validator) (bool, response) { +func RetryGraphite(query, from string, times int, validate Validator) (bool, Response) { return retry(query, from, times, validate, "http://localhost") } -func retryMT(query, from string, times int, validate Validator) (bool, response) { +func RetryMT(query, from string, times int, validate Validator) (bool, Response) { return retry(query, from, times, validate, "http://localhost:6060") } -func retry(query, from string, times int, validate Validator, base string) (bool, response) { - var resp response +func retry(query, from string, times int, validate Validator, base string) (bool, Response) { + var resp Response for i := 0; i < times; i++ { if i > 0 { time.Sleep(time.Second) @@ -72,17 +72,17 @@ type checkResultsTemp struct { empty int timeout int other int - firstOther *response + firstOther *Response } // final outcome of check results -type checkResults struct { - valid []int // each position corresponds to a validator +type CheckResults struct { + Valid []int // each position corresponds to a validator // categories of invalid responses - empty int - timeout int - other int - firstOther *response + Empty int + Timeout int + Other int + FirstOther *Response } func newCheckResultsTemp(validators []Validator) *checkResultsTemp { @@ -133,7 +133,7 @@ func checkWorker(base, query, from string, wg *sync.WaitGroup, cr *checkResultsT // meaning the counts of each response matching each validator function, and the number // of timeouts, and finally all others (non-timeouting invalid responses) // we recommend for 60s duration to use 6000 requests, e.g. 100 per second -func checkMT(endpoints []int, query, from string, dur time.Duration, reqs int, validators ...Validator) checkResults { +func CheckMT(endpoints []int, query, from string, dur time.Duration, reqs int, validators ...Validator) CheckResults { pre := time.Now() ret := newCheckResultsTemp(validators) period := dur / time.Duration(reqs) @@ -154,11 +154,11 @@ func checkMT(endpoints []int, query, from string, dur time.Duration, reqs int, v if time.Since(pre) > (110*dur/100)+2*time.Second { panic(fmt.Sprintf("checkMT ran too long for some reason. expected %s. took actually %s. system overloaded?", dur, time.Since(pre))) } - return checkResults{ - valid: ret.valid, - empty: ret.empty, - timeout: ret.timeout, - other: ret.other, - firstOther: ret.firstOther, + return CheckResults{ + Valid: ret.valid, + Empty: ret.empty, + Timeout: ret.timeout, + Other: ret.other, + FirstOther: ret.firstOther, } } diff --git a/chaos/graphite_response.go b/chaos/graphite/response.go similarity index 97% rename from chaos/graphite_response.go rename to chaos/graphite/response.go index f10a80d856..957ad34b0a 100644 --- a/chaos/graphite_response.go +++ b/chaos/graphite/response.go @@ -1,4 +1,4 @@ -package chaos +package graphite import ( "bytes" @@ -7,7 +7,7 @@ import ( "strconv" ) -type Response []Series +type Data []Series type Series struct { Target string diff --git a/chaos/graphite_validate.go b/chaos/graphite/validate.go similarity index 81% rename from chaos/graphite_validate.go rename to chaos/graphite/validate.go index 2664cb01f0..541d1492b8 100644 --- a/chaos/graphite_validate.go +++ b/chaos/graphite/validate.go @@ -1,23 +1,23 @@ -package chaos +package graphite import "math" -// response is a convenience type: +// Response is a convenience type: // it provides original http and json decode errors, if applicable // and also the decoded response body, if any -type response struct { +type Response struct { httpErr error decodeErr error code int traceID string - r Response + r Data } -type Validator func(resp response) bool +type Validator func(resp Response) bool -// validateTargets returns a function that validates that the response contains exactly all named targets -func validateTargets(targets []string) Validator { - return func(resp response) bool { +// ValidateTargets returns a function that validates that the response contains exactly all named targets +func ValidateTargets(targets []string) Validator { + return func(resp Response) bool { if resp.httpErr != nil || resp.decodeErr != nil || resp.code != 200 { return false } @@ -33,7 +33,7 @@ func validateTargets(targets []string) Validator { } } -// validateCorrect returns a validator with a min number, +// ValidateCorrect returns a validator with a min number, // which will validate whether we received a "sufficiently correct" // response. We assume the response corresponds to a sumSeries() query // of multiple series, typically across shards across different instances. @@ -44,8 +44,8 @@ func validateTargets(targets []string) Validator { // to allow 4 shards being down and unaccounted for, pass 8. // NOTE: 8 points are ignored (see comments further down) so you should only call this // for sufficiently long series, e.g. 15 points or so. -func validateCorrect(num float64) Validator { - return func(resp response) bool { +func ValidateCorrect(num float64) Validator { + return func(resp Response) bool { if resp.httpErr != nil || resp.decodeErr != nil || resp.code != 200 { return false } @@ -72,9 +72,9 @@ func validateCorrect(num float64) Validator { } } -// validaterCode returns a validator that validates whether the response has the given code -func validateCode(code int) Validator { - return func(resp response) bool { +// ValidaterCode returns a validator that validates whether the response has the given code +func ValidateCode(code int) Validator { + return func(resp Response) bool { if resp.code == code { return true } @@ -82,14 +82,14 @@ func validateCode(code int) Validator { } } -// validatorAvgWindowed returns a validator that validates the number of series and the avg value of each series +// ValidatorAvgWindowed returns a validator that validates the number of series and the avg value of each series // it is windowed to allow the dataset to include one or two values that would be evened out by a value // just outside of the response. For example: // response: NaN 4 4 4 5 3 4 4 4 5 // clearly here we can trust that if the avg value should be 4, that there would be a 3 coming after the response // but we don't want to wait for that. // NOTE: ignores up to 2 points from each series, adjust your input size accordingly for desired confidence -func validatorAvgWindowed(numPoints int, avg float64) Validator { +func ValidatorAvgWindowed(numPoints int, avg float64) Validator { try := func(datapoints []Point) bool { for i := 0; i <= 1; i++ { Try: @@ -109,7 +109,7 @@ func validatorAvgWindowed(numPoints int, avg float64) Validator { } return false } - return func(resp response) bool { + return func(resp Response) bool { for _, series := range resp.r { if len(series.Datapoints) != numPoints { return false diff --git a/chaos/chaos_test.go b/chaos/tests/chaos_cluster/chaos_cluster_test.go similarity index 65% rename from chaos/chaos_test.go rename to chaos/tests/chaos_cluster/chaos_cluster_test.go index d9ab85fe1d..7a052c32c4 100644 --- a/chaos/chaos_test.go +++ b/chaos/tests/chaos_cluster/chaos_cluster_test.go @@ -1,4 +1,4 @@ -package chaos +package chaos_cluster import ( "context" @@ -11,28 +11,33 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/grafana/metrictank/chaos/docker" + "github.com/grafana/metrictank/chaos/fakemetrics" + "github.com/grafana/metrictank/chaos/grafana" + "github.com/grafana/metrictank/chaos/graphite" + "github.com/grafana/metrictank/chaos/track" ) // TODO: cleanup when ctrl-C go test (teardown all containers) -var tracker *Tracker +var tracker *track.Tracker func TestMain(m *testing.M) { ctx, cancelFunc := context.WithCancel(context.Background()) fmt.Println("stopping docker-chaos stack should it be running...") cmd := exec.CommandContext(ctx, "docker-compose", "down") - cmd.Dir = path("docker/docker-chaos") + cmd.Dir = docker.Path("docker/docker-chaos") err := cmd.Start() if err != nil { log.Fatal(err) } fmt.Println("launching docker-chaos stack...") - cmd = exec.CommandContext(ctx, path("docker/launch.sh"), "docker-chaos") + cmd = exec.CommandContext(ctx, docker.Path("docker/launch.sh"), "docker-chaos") cmd.Env = append(cmd.Env, "MT_CLUSTER_MIN_AVAILABLE_SHARDS=12") - tracker, err = NewTracker(cmd, false, false, "launch-stdout", "launch-stderr") + tracker, err = track.NewTracker(cmd, false, false, "launch-stdout", "launch-stderr") if err != nil { log.Fatal(err) } @@ -55,7 +60,7 @@ func TestMain(m *testing.M) { } func TestClusterStartup(t *testing.T) { - matchers := []Matcher{ + matchers := []track.Matcher{ {Str: "metrictank0_1.*metricIndex initialized.*starting data consumption$"}, {Str: "metrictank1_1.*metricIndex initialized.*starting data consumption$"}, {Str: "metrictank2_1.*metricIndex initialized.*starting data consumption$"}, @@ -69,17 +74,17 @@ func TestClusterStartup(t *testing.T) { fmt.Println("stack now running.") fmt.Println("Go to http://localhost:3000 (and login as admin:admin) to see what's going on") case <-time.After(time.Second * 40): - postAnnotation("TestClusterStartup:FAIL") + grafana.PostAnnotation("TestClusterStartup:FAIL") t.Fatal("timed out while waiting for all metrictank instances to come up") } } func TestClusterBaseIngestWorkload(t *testing.T) { - postAnnotation("TestClusterBaseIngestWorkload:begin") + grafana.PostAnnotation("TestClusterBaseIngestWorkload:begin") - go fakeMetrics() + go fakemetrics.Kafka() - suc6, resp := retryGraphite("perSecond(metrictank.stats.docker-cluster.*.input.kafka-mdm.metrics_received.counter32)", "-8s", 18, func(resp response) bool { + suc6, resp := graphite.RetryGraphite("perSecond(metrictank.stats.docker-cluster.*.input.kafka-mdm.metrics_received.counter32)", "-8s", 18, func(resp graphite.Response) bool { exp := []string{ "perSecond(metrictank.stats.docker-cluster.metrictank0.input.kafka-mdm.metrics_received.counter32)", "perSecond(metrictank.stats.docker-cluster.metrictank1.input.kafka-mdm.metrics_received.counter32)", @@ -89,33 +94,33 @@ func TestClusterBaseIngestWorkload(t *testing.T) { "perSecond(metrictank.stats.docker-cluster.metrictank5.input.kafka-mdm.metrics_received.counter32)", } // avg rate must be 4 (metrics ingested per second by each instance) - return validateTargets(exp)(resp) && validatorAvgWindowed(8, 4)(resp) + return graphite.ValidateTargets(exp)(resp) && graphite.ValidatorAvgWindowed(8, 4)(resp) }) if !suc6 { - postAnnotation("TestClusterBaseIngestWorkload:FAIL") + grafana.PostAnnotation("TestClusterBaseIngestWorkload:FAIL") t.Fatalf("cluster did not reach a state where each MT instance receives 4 points per second. last response was: %s", spew.Sdump(resp)) } - suc6, resp = retryMT("sum(some.id.of.a.metric.*)", "-16s", 20, validateCorrect(12)) + suc6, resp = graphite.RetryMT("sum(some.id.of.a.metric.*)", "-16s", 20, graphite.ValidateCorrect(12)) if !suc6 { - postAnnotation("TestClusterBaseIngestWorkload:FAIL") + grafana.PostAnnotation("TestClusterBaseIngestWorkload:FAIL") t.Fatalf("could not query correct result set. sum of 12 series, each valued 1, should result in 12. last response was: %s", spew.Sdump(resp)) } } func TestQueryWorkload(t *testing.T) { - postAnnotation("TestQueryWorkload:begin") + grafana.PostAnnotation("TestQueryWorkload:begin") - results := checkMT([]int{6060, 6061, 6062, 6063, 6064, 6065}, "sum(some.id.of.a.metric.*)", "-14s", time.Minute, 6000, validateCorrect(12)) + results := graphite.CheckMT([]int{6060, 6061, 6062, 6063, 6064, 6065}, "sum(some.id.of.a.metric.*)", "-14s", time.Minute, 6000, graphite.ValidateCorrect(12)) - exp := checkResults{ - valid: []int{6000}, - empty: 0, - timeout: 0, - other: 0, + exp := graphite.CheckResults{ + Valid: []int{6000}, + Empty: 0, + Timeout: 0, + Other: 0, } if !reflect.DeepEqual(exp, results) { - postAnnotation("TestQueryWorkload:FAIL") + grafana.PostAnnotation("TestQueryWorkload:FAIL") t.Fatalf("expected only correct results. got %s", spew.Sdump(results)) } } @@ -127,50 +132,50 @@ func TestQueryWorkload(t *testing.T) { // the isolated shard should either return correct replies, or errors (in two cases: when it marks any shards as down, // but also before it does, but fails to get data via clustered requests from peers) func TestIsolateOneInstance(t *testing.T) { - postAnnotation("TestIsolateOneInstance:begin") + grafana.PostAnnotation("TestIsolateOneInstance:begin") numReqMt4 := 1200 - mt4ResultsChan := make(chan checkResults, 1) - otherResultsChan := make(chan checkResults, 1) + mt4ResultsChan := make(chan graphite.CheckResults, 1) + otherResultsChan := make(chan graphite.CheckResults, 1) go func() { - mt4ResultsChan <- checkMT([]int{6064}, "sum(some.id.of.a.metric.*)", "-15s", time.Minute, numReqMt4, validateCorrect(12), validateCode(503)) + mt4ResultsChan <- graphite.CheckMT([]int{6064}, "sum(some.id.of.a.metric.*)", "-15s", time.Minute, numReqMt4, graphite.ValidateCorrect(12), graphite.ValidateCode(503)) }() go func() { - otherResultsChan <- checkMT([]int{6060, 6061, 6062, 6063, 6065}, "sum(some.id.of.a.metric.*)", "-15s", time.Minute, 6000, validateCorrect(12)) + otherResultsChan <- graphite.CheckMT([]int{6060, 6061, 6062, 6063, 6065}, "sum(some.id.of.a.metric.*)", "-15s", time.Minute, 6000, graphite.ValidateCorrect(12)) }() // now go ahead and isolate for 30s - isolate([]string{"metrictank4"}, []string{"metrictank0", "metrictank1", "metrictank2", "metrictank3", "metrictank5"}, "30s") + docker.Isolate([]string{"metrictank4"}, []string{"metrictank0", "metrictank1", "metrictank2", "metrictank3", "metrictank5"}, "30s") // collect results of the minute long experiment mt4Results := <-mt4ResultsChan otherResults := <-otherResultsChan // validate results of isolated node - if mt4Results.valid[0]+mt4Results.valid[1] != numReqMt4 { + if mt4Results.Valid[0]+mt4Results.Valid[1] != numReqMt4 { t.Fatalf("expected mt4 to return either correct or erroring responses (total %d). got %s", numReqMt4, spew.Sdump(mt4Results)) } - if mt4Results.valid[1] < numReqMt4*30/100 { + if mt4Results.Valid[1] < numReqMt4*30/100 { // the instance is completely down for 30s of the 60s experiment run, but we allow some slack t.Fatalf("expected at least 30%% of all mt4 results to succeed. did %d queries. got %s", numReqMt4, spew.Sdump(mt4Results)) } // validate results of other cluster nodes - exp := checkResults{ - valid: []int{6000}, - empty: 0, - timeout: 0, - other: 0, + exp := graphite.CheckResults{ + Valid: []int{6000}, + Empty: 0, + Timeout: 0, + Other: 0, } if !reflect.DeepEqual(exp, otherResults) { - postAnnotation("TestIsolateOneInstance:FAIL") + grafana.PostAnnotation("TestIsolateOneInstance:FAIL") t.Fatalf("expected only correct results for all cluster nodes. got %s", spew.Sdump(otherResults)) } } func TestHang(t *testing.T) { - postAnnotation("TestHang:begin") + grafana.PostAnnotation("TestHang:begin") t.Log("whatever happens, keep hanging for now, so that we can query grafana dashboards still") var ch chan struct{} <-ch diff --git a/chaos/tracker.go b/chaos/track/tracker.go similarity index 99% rename from chaos/tracker.go rename to chaos/track/tracker.go index eb420085de..990535665a 100644 --- a/chaos/tracker.go +++ b/chaos/track/tracker.go @@ -1,4 +1,4 @@ -package chaos +package track import ( "bufio" From d12abf398fbc0624c08db5afcf1c88b8cf54dcb5 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 23 Jan 2018 13:02:47 +0100 Subject: [PATCH 02/15] more flexible comparator mechanism will be useful for carbon tests --- chaos/graphite/comparator.go | 17 +++++++++++++++++ chaos/graphite/validate.go | 4 ++-- chaos/tests/chaos_cluster/chaos_cluster_test.go | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 chaos/graphite/comparator.go diff --git a/chaos/graphite/comparator.go b/chaos/graphite/comparator.go new file mode 100644 index 0000000000..4bb62b5963 --- /dev/null +++ b/chaos/graphite/comparator.go @@ -0,0 +1,17 @@ +package graphite + +import "math" + +type Comparator func(p float64) bool + +func Eq(good float64) Comparator { + return func(p float64) bool { + return (math.IsNaN(good) && math.IsNaN(p)) || p == good + } +} + +func Ge(good float64) Comparator { + return func(p float64) bool { + return (math.IsNaN(good) && math.IsNaN(p)) || p >= good + } +} diff --git a/chaos/graphite/validate.go b/chaos/graphite/validate.go index 541d1492b8..6e7c61b077 100644 --- a/chaos/graphite/validate.go +++ b/chaos/graphite/validate.go @@ -89,7 +89,7 @@ func ValidateCode(code int) Validator { // clearly here we can trust that if the avg value should be 4, that there would be a 3 coming after the response // but we don't want to wait for that. // NOTE: ignores up to 2 points from each series, adjust your input size accordingly for desired confidence -func ValidatorAvgWindowed(numPoints int, avg float64) Validator { +func ValidatorAvgWindowed(numPoints int, cmp Comparator) Validator { try := func(datapoints []Point) bool { for i := 0; i <= 1; i++ { Try: @@ -102,7 +102,7 @@ func ValidatorAvgWindowed(numPoints int, avg float64) Validator { } sum += p.Val } - if sum/float64(len(points)) == avg { + if cmp(sum / float64(len(points))) { return true } } diff --git a/chaos/tests/chaos_cluster/chaos_cluster_test.go b/chaos/tests/chaos_cluster/chaos_cluster_test.go index 7a052c32c4..c6fbe44083 100644 --- a/chaos/tests/chaos_cluster/chaos_cluster_test.go +++ b/chaos/tests/chaos_cluster/chaos_cluster_test.go @@ -94,7 +94,7 @@ func TestClusterBaseIngestWorkload(t *testing.T) { "perSecond(metrictank.stats.docker-cluster.metrictank5.input.kafka-mdm.metrics_received.counter32)", } // avg rate must be 4 (metrics ingested per second by each instance) - return graphite.ValidateTargets(exp)(resp) && graphite.ValidatorAvgWindowed(8, 4)(resp) + return graphite.ValidateTargets(exp)(resp) && graphite.ValidatorAvgWindowed(8, graphite.Eq(4))(resp) }) if !suc6 { grafana.PostAnnotation("TestClusterBaseIngestWorkload:FAIL") From 98ef96e1e25c749526cfca962acbc8aeeb4206b5 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 23 Jan 2018 13:04:24 +0100 Subject: [PATCH 03/15] simpler --- chaos/graphite/validate.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/chaos/graphite/validate.go b/chaos/graphite/validate.go index 6e7c61b077..43efe2d241 100644 --- a/chaos/graphite/validate.go +++ b/chaos/graphite/validate.go @@ -75,10 +75,7 @@ func ValidateCorrect(num float64) Validator { // ValidaterCode returns a validator that validates whether the response has the given code func ValidateCode(code int) Validator { return func(resp Response) bool { - if resp.code == code { - return true - } - return false + return resp.code == code } } From 4eac8cdd61b2147ad5a1c4b212fb4752d67a9696 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 23 Jan 2018 13:01:27 +0100 Subject: [PATCH 04/15] cleanly handle abort otherwise got errors like 0: already closed --- chaos/docker/docker.go | 2 +- chaos/tests/chaos_cluster/chaos_cluster_test.go | 2 +- chaos/track/tracker.go | 12 +++++++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/chaos/docker/docker.go b/chaos/docker/docker.go index fcc7a7d5a7..fe907a0025 100644 --- a/chaos/docker/docker.go +++ b/chaos/docker/docker.go @@ -109,7 +109,7 @@ func IsolateOut(name, dur string, targets ...string) error { } // log all pumba's output - _, err = track.NewTracker(cmd, false, false, "pumba-stdout", "pumba-stderr") + _, err = track.NewTracker(context.TODO(), cmd, false, false, "pumba-stdout", "pumba-stderr") if err != nil { return err } diff --git a/chaos/tests/chaos_cluster/chaos_cluster_test.go b/chaos/tests/chaos_cluster/chaos_cluster_test.go index c6fbe44083..23b6319314 100644 --- a/chaos/tests/chaos_cluster/chaos_cluster_test.go +++ b/chaos/tests/chaos_cluster/chaos_cluster_test.go @@ -37,7 +37,7 @@ func TestMain(m *testing.M) { cmd = exec.CommandContext(ctx, docker.Path("docker/launch.sh"), "docker-chaos") cmd.Env = append(cmd.Env, "MT_CLUSTER_MIN_AVAILABLE_SHARDS=12") - tracker, err = track.NewTracker(cmd, false, false, "launch-stdout", "launch-stderr") + tracker, err = track.NewTracker(ctx, cmd, false, false, "launch-stdout", "launch-stderr") if err != nil { log.Fatal(err) } diff --git a/chaos/track/tracker.go b/chaos/track/tracker.go index 990535665a..dbbd930087 100644 --- a/chaos/track/tracker.go +++ b/chaos/track/tracker.go @@ -2,6 +2,7 @@ package track import ( "bufio" + "context" "fmt" "io" "os/exec" @@ -11,6 +12,7 @@ import ( // Tracker allows to track stdout and stderr of running commands // and wait for certain messages to appear type Tracker struct { + ctx context.Context stdout io.ReadCloser stderr io.ReadCloser stdoutChan chan string @@ -23,7 +25,7 @@ type Tracker struct { prefixStderr string } -func NewTracker(cmd *exec.Cmd, logStdout, logStderr bool, prefixStdout, prefixStderr string) (*Tracker, error) { +func NewTracker(ctx context.Context, cmd *exec.Cmd, logStdout, logStderr bool, prefixStdout, prefixStderr string) (*Tracker, error) { stdout, err := cmd.StdoutPipe() if err != nil { return nil, err @@ -33,6 +35,7 @@ func NewTracker(cmd *exec.Cmd, logStdout, logStderr bool, prefixStdout, prefixSt return nil, err } t := &Tracker{ + ctx, stdout, stderr, make(chan string), @@ -62,6 +65,13 @@ func (t *Tracker) track(in io.ReadCloser, out chan string) { out <- scanner.Text() } if err := scanner.Err(); err != nil { + if t.ctx != nil { + select { + case <-t.ctx.Done(): + return + default: + } + } t.errChan <- err } } From 690ee2a63641e232fb1c551b27235f3df5c83195 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 23 Jan 2018 09:32:31 +0100 Subject: [PATCH 05/15] implement end2end carbon test via our go chaos toolkit 1) it's about twice as fast (on my system at least) ./scripts/qa/end2end.sh 1.17s user 0.18s system 2% cpu 1:05.40 total go test -v ./tests/simple_carbon 1.35s user 0.20s system 5% cpu 30.795 total 2) reuse of shared helper libraries with chaos stack. these helper libraries are good for all kinds of testing, not just chaos-induced 3) we're better at Go than at python --- .circleci/config.yml | 2 +- chaos/fakemetrics/fakemetrics.go | 28 ++++- chaos/fakemetrics/out/carbon/carbon.go | 65 +++++++++++ chaos/graphite/graphite.go | 5 + chaos/graphite/validate.go | 18 ++++ .../tests/simple_carbon/simple_carbon_test.go | 101 ++++++++++++++++++ scripts/qa/end2end.sh | 33 ------ scripts/qa/generate_test_data.sh | 31 ------ scripts/qa/verify_metrics_received.py | 89 --------------- 9 files changed, 215 insertions(+), 157 deletions(-) create mode 100644 chaos/fakemetrics/out/carbon/carbon.go create mode 100644 chaos/tests/simple_carbon/simple_carbon_test.go delete mode 100755 scripts/qa/end2end.sh delete mode 100755 scripts/qa/generate_test_data.sh delete mode 100755 scripts/qa/verify_metrics_received.py diff --git a/.circleci/config.yml b/.circleci/config.yml index e43e84b879..f8919c12bf 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -50,7 +50,7 @@ jobs: at: . - run: scripts/qa/docs.sh - run: docker load -i build_docker/metrictank.tar - - run: scripts/qa/end2end.sh + - run: go test -v ./chaos/tests/simple_carbon deploy: docker: diff --git a/chaos/fakemetrics/fakemetrics.go b/chaos/fakemetrics/fakemetrics.go index eb3245b5a4..2ad4834b00 100644 --- a/chaos/fakemetrics/fakemetrics.go +++ b/chaos/fakemetrics/fakemetrics.go @@ -5,6 +5,8 @@ import ( "log" "time" + "github.com/grafana/metrictank/chaos/fakemetrics/out" + "github.com/grafana/metrictank/chaos/fakemetrics/out/carbon" "github.com/grafana/metrictank/chaos/fakemetrics/out/kafkamdm" "github.com/grafana/metrictank/clock" "github.com/raintank/met/helper" @@ -16,7 +18,13 @@ const numPartitions = 12 var metrics []*schema.MetricData func init() { - for i := 0; i < numPartitions; i++ { + generateMetrics(numPartitions) +} + +func generateMetrics(num int) { + metrics = make([]*schema.MetricData, num) + + for i := 0; i < num; i++ { name := fmt.Sprintf("some.id.of.a.metric.%d", i) m := &schema.MetricData{ OrgId: 1, @@ -27,7 +35,7 @@ func init() { Mtype: "gauge", } m.SetId() - metrics = append(metrics, m) + metrics[i] = m } } @@ -37,6 +45,20 @@ func Kafka() { if err != nil { log.Fatal(4, "failed to create kafka-mdm output. %s", err) } + run(out) +} + +func Carbon(num int) { + stats, _ := helper.New(false, "", "standard", "", "") + out, err := carbon.New("localhost:2003", stats) + if err != nil { + log.Fatal(4, "failed to create kafka-mdm output. %s", err) + } + generateMetrics(num) + run(out) +} + +func run(out out.Out) { // advantage over regular ticker: // 1) no ticks dropped // 2) ticks come asap after the start of a new second, so we can measure better how long it took to get the data @@ -49,7 +71,7 @@ func Kafka() { } err := out.Flush(metrics) if err != nil { - panic(fmt.Sprintf("failed to send data to kafka: %s", err)) + panic(fmt.Sprintf("failed to send data to output: %s", err)) } } } diff --git a/chaos/fakemetrics/out/carbon/carbon.go b/chaos/fakemetrics/out/carbon/carbon.go new file mode 100644 index 0000000000..3fcb47c3d1 --- /dev/null +++ b/chaos/fakemetrics/out/carbon/carbon.go @@ -0,0 +1,65 @@ +package carbon + +import ( + "bytes" + "fmt" + "net" + "sync" + "time" + + "github.com/grafana/metrictank/chaos/fakemetrics/out" + "github.com/raintank/met" + "gopkg.in/raintank/schema.v1" +) + +type Carbon struct { + sync.Mutex + out.OutStats + addr string + conn net.Conn +} + +func New(addr string, stats met.Backend) (*Carbon, error) { + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + + return &Carbon{ + sync.Mutex{}, + out.NewStats(stats, "carbon"), + addr, + conn, + }, nil +} + +func (n *Carbon) Close() error { + return n.conn.Close() +} + +func (n *Carbon) Flush(metrics []*schema.MetricData) error { + if len(metrics) == 0 { + n.FlushDuration.Value(0) + return nil + } + preFlush := time.Now() + buf := bytes.NewBufferString("") + for _, m := range metrics { + buf.WriteString(fmt.Sprintf("%s %f %d\n", m.Name, m.Value, m.Time)) + } + prePub := time.Now() + n.Lock() + _, err := n.conn.Write(buf.Bytes()) + n.Unlock() + if err != nil { + n.PublishErrors.Inc(1) + return err + } + n.MessageBytes.Value(int64(buf.Len())) + n.MessageMetrics.Value(int64(len(metrics))) + n.PublishedMetrics.Inc(int64(len(metrics))) + n.PublishedMessages.Inc(1) + n.PublishDuration.Value(time.Since(prePub)) + n.FlushDuration.Value(time.Since(preFlush)) + return nil +} diff --git a/chaos/graphite/graphite.go b/chaos/graphite/graphite.go index 5590414381..f64abc13a9 100644 --- a/chaos/graphite/graphite.go +++ b/chaos/graphite/graphite.go @@ -47,6 +47,11 @@ func renderQuery(base, target, from string) Response { func RetryGraphite(query, from string, times int, validate Validator) (bool, Response) { return retry(query, from, times, validate, "http://localhost") } + +func RetryGraphite8080(query, from string, times int, validate Validator) (bool, Response) { + return retry(query, from, times, validate, "http://localhost:8080") +} + func RetryMT(query, from string, times int, validate Validator) (bool, Response) { return retry(query, from, times, validate, "http://localhost:6060") } diff --git a/chaos/graphite/validate.go b/chaos/graphite/validate.go index 43efe2d241..5fee696644 100644 --- a/chaos/graphite/validate.go +++ b/chaos/graphite/validate.go @@ -118,3 +118,21 @@ func ValidatorAvgWindowed(numPoints int, cmp Comparator) Validator { return true } } + +// ValidatorLenNulls returns a validator that validates that any of the series contained +// within the response, has a length of l and no more than prefix nulls up front. +func ValidatorLenNulls(prefix, l int) Validator { + return func(resp Response) bool { + for _, series := range resp.r { + if len(series.Datapoints) != l { + return false + } + for i, dp := range series.Datapoints { + if math.IsNaN(dp.Val) && i+1 > prefix { + return false + } + } + } + return true + } +} diff --git a/chaos/tests/simple_carbon/simple_carbon_test.go b/chaos/tests/simple_carbon/simple_carbon_test.go new file mode 100644 index 0000000000..205b547066 --- /dev/null +++ b/chaos/tests/simple_carbon/simple_carbon_test.go @@ -0,0 +1,101 @@ +package simple_carbon + +import ( + "context" + "fmt" + "log" + "os" + "os/exec" + "strings" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/grafana/metrictank/chaos/docker" + "github.com/grafana/metrictank/chaos/fakemetrics" + "github.com/grafana/metrictank/chaos/grafana" + "github.com/grafana/metrictank/chaos/graphite" + "github.com/grafana/metrictank/chaos/track" +) + +// TODO: cleanup when ctrl-C go test (teardown all containers) + +var tracker *track.Tracker + +const metricsPerSecond = 1000 + +func TestMain(m *testing.M) { + ctx, cancelFunc := context.WithCancel(context.Background()) + + fmt.Println("stopping docker-dev stack should it be running...") + cmd := exec.CommandContext(ctx, "docker-compose", "down") + cmd.Dir = docker.Path("docker/docker-dev") + err := cmd.Start() + if err != nil { + log.Fatal(err) + } + + fmt.Println("launching docker-dev stack...") + cmd = exec.CommandContext(ctx, docker.Path("docker/launch.sh"), "docker-dev") + + tracker, err = track.NewTracker(ctx, cmd, false, false, "launch-stdout", "launch-stderr") + if err != nil { + log.Fatal(err) + } + + err = cmd.Start() + if err != nil { + log.Fatal(err) + } + + retcode := m.Run() + + fmt.Println("stopping docker-compose stack...") + cancelFunc() + if err := cmd.Wait(); err != nil { + if strings.Contains(err.Error(), "signal: killed") { + os.Exit(retcode) + } + log.Printf("ERROR: could not cleanly shutdown running docker-compose command: %s", err) + retcode = 1 + } + + os.Exit(retcode) +} + +func TestStartup(t *testing.T) { + matchers := []track.Matcher{ + {Str: "metrictank.*metricIndex initialized.*starting data consumption$"}, + {Str: "metrictank.*carbon-in: listening on.*2003"}, + {Str: "grafana.*Initializing HTTP Server.*:3000"}, + } + select { + case <-tracker.Match(matchers): + fmt.Println("stack now running.") + fmt.Println("Go to http://localhost:3000 (and login as admin:admin) to see what's going on") + case <-time.After(time.Second * 40): + grafana.PostAnnotation("TestStartup:FAIL") + t.Fatal("timed out while waiting for all metrictank instances to come up") + } +} + +func TestBaseIngestWorkload(t *testing.T) { + grafana.PostAnnotation("TestBaseIngestWorkload:begin") + + go fakemetrics.Carbon(metricsPerSecond) + + suc6, resp := graphite.RetryGraphite8080("perSecond(metrictank.stats.docker-env.*.input.carbon.metrics_received.counter32)", "-8s", 18, func(resp graphite.Response) bool { + exp := []string{ + "perSecond(metrictank.stats.docker-env.default.input.carbon.metrics_received.counter32)", + } + a := graphite.ValidateTargets(exp)(resp) + b := graphite.ValidatorLenNulls(1, 8)(resp) + c := graphite.ValidatorAvgWindowed(8, graphite.Ge(metricsPerSecond))(resp) + log.Printf("condition target names %t - condition len & nulls %t - condition avg value %t", a, b, c) + return a && b && c + }) + if !suc6 { + grafana.PostAnnotation("TestBaseIngestWorkload:FAIL") + t.Fatalf("cluster did not reach a state where the MT instance receives 4 points per second. last response was: %s", spew.Sdump(resp)) + } +} diff --git a/scripts/qa/end2end.sh b/scripts/qa/end2end.sh deleted file mode 100755 index 6f0d2a2ca4..0000000000 --- a/scripts/qa/end2end.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/sh - -log () { - echo "$(date +'%Y/%m/%d %H:%M:%S') $@" -} - - -DOCKER_COMPOSE_FILE="docker/docker-dev/docker-compose.yml" - -log "running docker-compose up" -docker-compose -f $DOCKER_COMPOSE_FILE up -d - -# wait for carbon input before sending data -export WAIT_HOSTS="127.0.0.1:2003" -export WAIT_TIMEOUT=120 -export METRICS_PER_SECOND=1000 -scripts/util/wait_for_endpoint.sh scripts/qa/generate_test_data.sh start - -log "sleeping 30s to give fakemetrics some warmup time" -sleep 30 -log "sleep over. time to verify" - -# verify the metrics have arrived in graphite and keep exit status -scripts/qa/verify_metrics_received.py 127.0.0.1 8080 10 $METRICS_PER_SECOND -RESULT=$? - -log "got result. stopping fakemetrics" -scripts/qa/generate_test_data.sh stop -log "running docker-compose down" -docker-compose -f $DOCKER_COMPOSE_FILE down -log "docker-compose down: complete" - -exit $RESULT diff --git a/scripts/qa/generate_test_data.sh b/scripts/qa/generate_test_data.sh deleted file mode 100755 index 5e405b4073..0000000000 --- a/scripts/qa/generate_test_data.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/sh - -FAKEMETRICS_REPO="github.com/raintank/fakemetrics" -FAKEMETRICS_PID="/tmp/fakemetrics.pid" - -if [ $# -eq 0 ] -then - echo "$0 start|stop" - exit 1 -fi - -ACTION=$1 -METRICS_PER_SEC=${METRICS_PER_SEC:-1000} -HOST=${HOST:-"127.0.0.1"} -PORT=${PORT:-2003} - -# ensure that $GOPATH is part of $PATH -GOBIN="$GOPATH/bin" -export PATH=$(echo "$PATH"| grep -q "$GOBIN" && echo "$PATH" || echo "$PATH:$GOBIN") - -case "$ACTION" in - "start") - go get $FAKEMETRICS_REPO - echo "generating test data..." - fakemetrics feed --carbon-addr $HOST:$PORT --orgs 1 --mpo $METRICS_PER_SEC & - echo $! > $FAKEMETRICS_PID - ;; - "stop") - pkill -F $FAKEMETRICS_PID - ;; -esac diff --git a/scripts/qa/verify_metrics_received.py b/scripts/qa/verify_metrics_received.py deleted file mode 100755 index 2646415d8b..0000000000 --- a/scripts/qa/verify_metrics_received.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/python - -import sys -import time -import requests -import json - - -def error(msg): - sys.stderr.write(msg + '\n') - sys.exit(1) - - -if len(sys.argv) < 4 or not all([x.isdigit() for x in sys.argv[2:]]): - error( - '{cmd} host port check_seconds expected_value' - .format(cmd=sys.argv[0]) - ) - -request_parameters = { - 'url': 'http://{host}:{port}/render'.format( - host=sys.argv[1], - port=sys.argv[2], - ), - 'params': { - 'from': '-{secs}sec'.format(secs=sys.argv[3]), - 'format': 'json', - 'maxDataPoints': sys.argv[3], - }, - 'data': { - 'target': - 'perSecond(metrictank.stats.docker-env.default.input.carbon.metricdata.received.counter32)', - }, -} - -# wait while metrics are being generated -time.sleep(int(sys.argv[3])) - -result = requests.post(**request_parameters) - -if result.status_code != 200: - error( - 'received bad response status code: {code} - reason: {reason}' - .format(code=result.status_code, reason=result.text) - ) - -try: - parsed_result = json.loads(result.text) -except Exception: - error( - 'failed to parse response: {text}' - .format(text=result.text) - ) - -# verify the format and content of the response is as we expect it -# note : since we got a perSecond(), the first value is always null, we only use points 2 and onwards -if ( - len(parsed_result) < 1 or - 'datapoints' not in parsed_result[0] or - not all([len(x) >= 2 for x in parsed_result[0]['datapoints']]) or - not all([ - isinstance(x[0], (int, float)) - for x in parsed_result[0]['datapoints'][1:] - ]) -): - error( - 'received unexpected response:\n{response}' - .format(response=result.text) - ) - -datapoints = [float(x[0]) for x in parsed_result[0]['datapoints'][1:]] -datapoints_avg = sum(datapoints)/len(datapoints) -expected = float(sys.argv[4]) - -if datapoints_avg > expected: - print('All good:') - print(result.text) - sys.exit(0) -else: - error( - '{received} is lower than {expected}.\n' - 'Based on received data:\n' - '{data}' - .format( - received=datapoints_avg, - expected=expected, - data=result.text, - ) - ) From 909c55f7ed4e4cce3bea1ea16952939245a0894a Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 23 Jan 2018 13:30:26 +0100 Subject: [PATCH 06/15] convey that this is no longer just chaos stuff --- .circleci/config.yml | 4 ++-- stacktest/README.md | 3 +++ {chaos => stacktest}/docker/docker.go | 2 +- {chaos => stacktest}/docker/path.go | 0 {chaos => stacktest}/fakemetrics/fakemetrics.go | 6 +++--- .../fakemetrics/out/carbon/carbon.go | 2 +- .../fakemetrics/out/kafkamdm/kafkamdm.go | 2 +- {chaos => stacktest}/fakemetrics/out/out.go | 0 {chaos => stacktest}/fakemetrics/out/saramahelper.go | 0 {chaos => stacktest}/grafana/grafana.go | 0 {chaos => stacktest}/graphite/comparator.go | 0 {chaos => stacktest}/graphite/graphite.go | 0 {chaos => stacktest}/graphite/response.go | 0 {chaos => stacktest}/graphite/validate.go | 0 {chaos => stacktest/tests/chaos_cluster}/README.md | 0 .../tests/chaos_cluster}/analyze-mt4-out.sh | 0 .../tests/chaos_cluster/chaos_cluster_test.go | 10 +++++----- .../tests/end2end_carbon/end2end_carbon_test.go | 12 ++++++------ {chaos => stacktest}/track/tracker.go | 0 19 files changed, 22 insertions(+), 19 deletions(-) create mode 100644 stacktest/README.md rename {chaos => stacktest}/docker/docker.go (98%) rename {chaos => stacktest}/docker/path.go (100%) rename {chaos => stacktest}/fakemetrics/fakemetrics.go (88%) rename {chaos => stacktest}/fakemetrics/out/carbon/carbon.go (95%) rename {chaos => stacktest}/fakemetrics/out/kafkamdm/kafkamdm.go (98%) rename {chaos => stacktest}/fakemetrics/out/out.go (100%) rename {chaos => stacktest}/fakemetrics/out/saramahelper.go (100%) rename {chaos => stacktest}/grafana/grafana.go (100%) rename {chaos => stacktest}/graphite/comparator.go (100%) rename {chaos => stacktest}/graphite/graphite.go (100%) rename {chaos => stacktest}/graphite/response.go (100%) rename {chaos => stacktest}/graphite/validate.go (100%) rename {chaos => stacktest/tests/chaos_cluster}/README.md (100%) rename {chaos => stacktest/tests/chaos_cluster}/analyze-mt4-out.sh (100%) rename {chaos => stacktest}/tests/chaos_cluster/chaos_cluster_test.go (96%) rename chaos/tests/simple_carbon/simple_carbon_test.go => stacktest/tests/end2end_carbon/end2end_carbon_test.go (90%) rename {chaos => stacktest}/track/tracker.go (100%) diff --git a/.circleci/config.yml b/.circleci/config.yml index f8919c12bf..241d634650 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -25,7 +25,7 @@ jobs: - image: circleci/golang:1.10 steps: - checkout - - run: go test -v -race $(go list ./... | grep -v github.com/grafana/metrictank/chaos) + - run: go test -v -race $(go list ./... | grep -v github.com/grafana/metrictank/stacktest) qa: working_directory: /go/src/github.com/grafana/metrictank @@ -50,7 +50,7 @@ jobs: at: . - run: scripts/qa/docs.sh - run: docker load -i build_docker/metrictank.tar - - run: go test -v ./chaos/tests/simple_carbon + - run: go test -v ./stacktest/tests/end2end_carbon deploy: docker: diff --git a/stacktest/README.md b/stacktest/README.md new file mode 100644 index 0000000000..6eab76354e --- /dev/null +++ b/stacktest/README.md @@ -0,0 +1,3 @@ +here are various go tests that spin up metrictank via a docker stack and assert correct function + +see tests directory diff --git a/chaos/docker/docker.go b/stacktest/docker/docker.go similarity index 98% rename from chaos/docker/docker.go rename to stacktest/docker/docker.go index fe907a0025..ea1792d7af 100644 --- a/chaos/docker/docker.go +++ b/stacktest/docker/docker.go @@ -7,7 +7,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/client" - "github.com/grafana/metrictank/chaos/track" + "github.com/grafana/metrictank/stacktest/track" ) var cli *client.Client diff --git a/chaos/docker/path.go b/stacktest/docker/path.go similarity index 100% rename from chaos/docker/path.go rename to stacktest/docker/path.go diff --git a/chaos/fakemetrics/fakemetrics.go b/stacktest/fakemetrics/fakemetrics.go similarity index 88% rename from chaos/fakemetrics/fakemetrics.go rename to stacktest/fakemetrics/fakemetrics.go index 2ad4834b00..c0520136ea 100644 --- a/chaos/fakemetrics/fakemetrics.go +++ b/stacktest/fakemetrics/fakemetrics.go @@ -5,9 +5,9 @@ import ( "log" "time" - "github.com/grafana/metrictank/chaos/fakemetrics/out" - "github.com/grafana/metrictank/chaos/fakemetrics/out/carbon" - "github.com/grafana/metrictank/chaos/fakemetrics/out/kafkamdm" + "github.com/grafana/metrictank/stacktest/fakemetrics/out" + "github.com/grafana/metrictank/stacktest/fakemetrics/out/carbon" + "github.com/grafana/metrictank/stacktest/fakemetrics/out/kafkamdm" "github.com/grafana/metrictank/clock" "github.com/raintank/met/helper" "gopkg.in/raintank/schema.v1" diff --git a/chaos/fakemetrics/out/carbon/carbon.go b/stacktest/fakemetrics/out/carbon/carbon.go similarity index 95% rename from chaos/fakemetrics/out/carbon/carbon.go rename to stacktest/fakemetrics/out/carbon/carbon.go index 3fcb47c3d1..fe15bfa073 100644 --- a/chaos/fakemetrics/out/carbon/carbon.go +++ b/stacktest/fakemetrics/out/carbon/carbon.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/grafana/metrictank/chaos/fakemetrics/out" + "github.com/grafana/metrictank/stacktest/fakemetrics/out" "github.com/raintank/met" "gopkg.in/raintank/schema.v1" ) diff --git a/chaos/fakemetrics/out/kafkamdm/kafkamdm.go b/stacktest/fakemetrics/out/kafkamdm/kafkamdm.go similarity index 98% rename from chaos/fakemetrics/out/kafkamdm/kafkamdm.go rename to stacktest/fakemetrics/out/kafkamdm/kafkamdm.go index 1de9680cf5..844895765b 100644 --- a/chaos/fakemetrics/out/kafkamdm/kafkamdm.go +++ b/stacktest/fakemetrics/out/kafkamdm/kafkamdm.go @@ -9,7 +9,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/grafana/metrictank/chaos/fakemetrics/out" + "github.com/grafana/metrictank/stacktest/fakemetrics/out" p "github.com/grafana/metrictank/cluster/partitioner" "github.com/raintank/met" "github.com/raintank/worldping-api/pkg/log" diff --git a/chaos/fakemetrics/out/out.go b/stacktest/fakemetrics/out/out.go similarity index 100% rename from chaos/fakemetrics/out/out.go rename to stacktest/fakemetrics/out/out.go diff --git a/chaos/fakemetrics/out/saramahelper.go b/stacktest/fakemetrics/out/saramahelper.go similarity index 100% rename from chaos/fakemetrics/out/saramahelper.go rename to stacktest/fakemetrics/out/saramahelper.go diff --git a/chaos/grafana/grafana.go b/stacktest/grafana/grafana.go similarity index 100% rename from chaos/grafana/grafana.go rename to stacktest/grafana/grafana.go diff --git a/chaos/graphite/comparator.go b/stacktest/graphite/comparator.go similarity index 100% rename from chaos/graphite/comparator.go rename to stacktest/graphite/comparator.go diff --git a/chaos/graphite/graphite.go b/stacktest/graphite/graphite.go similarity index 100% rename from chaos/graphite/graphite.go rename to stacktest/graphite/graphite.go diff --git a/chaos/graphite/response.go b/stacktest/graphite/response.go similarity index 100% rename from chaos/graphite/response.go rename to stacktest/graphite/response.go diff --git a/chaos/graphite/validate.go b/stacktest/graphite/validate.go similarity index 100% rename from chaos/graphite/validate.go rename to stacktest/graphite/validate.go diff --git a/chaos/README.md b/stacktest/tests/chaos_cluster/README.md similarity index 100% rename from chaos/README.md rename to stacktest/tests/chaos_cluster/README.md diff --git a/chaos/analyze-mt4-out.sh b/stacktest/tests/chaos_cluster/analyze-mt4-out.sh similarity index 100% rename from chaos/analyze-mt4-out.sh rename to stacktest/tests/chaos_cluster/analyze-mt4-out.sh diff --git a/chaos/tests/chaos_cluster/chaos_cluster_test.go b/stacktest/tests/chaos_cluster/chaos_cluster_test.go similarity index 96% rename from chaos/tests/chaos_cluster/chaos_cluster_test.go rename to stacktest/tests/chaos_cluster/chaos_cluster_test.go index 23b6319314..f80c66562d 100644 --- a/chaos/tests/chaos_cluster/chaos_cluster_test.go +++ b/stacktest/tests/chaos_cluster/chaos_cluster_test.go @@ -11,11 +11,11 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/grafana/metrictank/chaos/docker" - "github.com/grafana/metrictank/chaos/fakemetrics" - "github.com/grafana/metrictank/chaos/grafana" - "github.com/grafana/metrictank/chaos/graphite" - "github.com/grafana/metrictank/chaos/track" + "github.com/grafana/metrictank/stacktest/docker" + "github.com/grafana/metrictank/stacktest/fakemetrics" + "github.com/grafana/metrictank/stacktest/grafana" + "github.com/grafana/metrictank/stacktest/graphite" + "github.com/grafana/metrictank/stacktest/track" ) // TODO: cleanup when ctrl-C go test (teardown all containers) diff --git a/chaos/tests/simple_carbon/simple_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go similarity index 90% rename from chaos/tests/simple_carbon/simple_carbon_test.go rename to stacktest/tests/end2end_carbon/end2end_carbon_test.go index 205b547066..b770e0f21b 100644 --- a/chaos/tests/simple_carbon/simple_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -1,4 +1,4 @@ -package simple_carbon +package end2end_carbon import ( "context" @@ -11,11 +11,11 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/grafana/metrictank/chaos/docker" - "github.com/grafana/metrictank/chaos/fakemetrics" - "github.com/grafana/metrictank/chaos/grafana" - "github.com/grafana/metrictank/chaos/graphite" - "github.com/grafana/metrictank/chaos/track" + "github.com/grafana/metrictank/stacktest/docker" + "github.com/grafana/metrictank/stacktest/fakemetrics" + "github.com/grafana/metrictank/stacktest/grafana" + "github.com/grafana/metrictank/stacktest/graphite" + "github.com/grafana/metrictank/stacktest/track" ) // TODO: cleanup when ctrl-C go test (teardown all containers) diff --git a/chaos/track/tracker.go b/stacktest/track/tracker.go similarity index 100% rename from chaos/track/tracker.go rename to stacktest/track/tracker.go From bc135e57fa6cb38c49ee69c8132e7b9e1eb368a7 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 12:07:10 +0100 Subject: [PATCH 07/15] fix script to properly terminate docker-compose child with previous approach, while docker-compose up was running, bash would just wait on it, and be unable to invoke its trap handler. apparently, when running the script interactively, ctrl-C would reach docker-compose up directly, then invoke the trap handler, but when running the script as subprocess, the signal wouldn't reach anyone. see https://unix.stackexchange.com/questions/146756/forward-sigterm-to-child-in-bash --- docker/launch.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docker/launch.sh b/docker/launch.sh index e0a250a387..04fd2332a5 100755 --- a/docker/launch.sh +++ b/docker/launch.sh @@ -26,9 +26,12 @@ cd $basedir/$env trap ctrl_c INT function ctrl_c() { + kill -INT "$child" 2>/dev/null docker-compose down } docker-compose down ../extra/populate-grafana.sh $PWD & -docker-compose up --force-recreate +docker-compose up --force-recreate & +child=$! +wait "$child" From 98318be660e6e019dbdb59c71da74b1baf795585 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 12:23:40 +0100 Subject: [PATCH 08/15] shut down output cleanly so we don't get connection/write errors when cleaning up the tests and shutting down the docker stack. --- stacktest/fakemetrics/fakemetrics.go | 74 ++++++++++++------- stacktest/fakemetrics/out/carbon/carbon.go | 24 +++++- .../tests/chaos_cluster/chaos_cluster_test.go | 6 +- .../end2end_carbon/end2end_carbon_test.go | 4 +- 4 files changed, 77 insertions(+), 31 deletions(-) diff --git a/stacktest/fakemetrics/fakemetrics.go b/stacktest/fakemetrics/fakemetrics.go index c0520136ea..c91d3457ac 100644 --- a/stacktest/fakemetrics/fakemetrics.go +++ b/stacktest/fakemetrics/fakemetrics.go @@ -5,24 +5,17 @@ import ( "log" "time" + "github.com/grafana/metrictank/clock" "github.com/grafana/metrictank/stacktest/fakemetrics/out" "github.com/grafana/metrictank/stacktest/fakemetrics/out/carbon" "github.com/grafana/metrictank/stacktest/fakemetrics/out/kafkamdm" - "github.com/grafana/metrictank/clock" + "github.com/raintank/met" "github.com/raintank/met/helper" "gopkg.in/raintank/schema.v1" ) -const numPartitions = 12 - -var metrics []*schema.MetricData - -func init() { - generateMetrics(numPartitions) -} - -func generateMetrics(num int) { - metrics = make([]*schema.MetricData, num) +func generateMetrics(num int) []*schema.MetricData { + metrics := make([]*schema.MetricData, num) for i := 0; i < num; i++ { name := fmt.Sprintf("some.id.of.a.metric.%d", i) @@ -37,41 +30,72 @@ func generateMetrics(num int) { m.SetId() metrics[i] = m } + + return metrics +} + +type FakeMetrics struct { + o out.Out + metrics []*schema.MetricData + close chan struct{} + closed bool +} + +func NewFakeMetrics(metrics []*schema.MetricData, o out.Out, stats met.Backend) *FakeMetrics { + fm := &FakeMetrics{ + o: o, + metrics: metrics, + close: make(chan struct{}), + } + go fm.run() + return fm } -func Kafka() { +func NewKafka(num int) *FakeMetrics { stats, _ := helper.New(false, "", "standard", "", "") out, err := kafkamdm.New("mdm", []string{"localhost:9092"}, "none", stats, "lastNum") if err != nil { log.Fatal(4, "failed to create kafka-mdm output. %s", err) } - run(out) + return NewFakeMetrics(generateMetrics(num), out, stats) } -func Carbon(num int) { +func NewCarbon(num int) *FakeMetrics { stats, _ := helper.New(false, "", "standard", "", "") out, err := carbon.New("localhost:2003", stats) if err != nil { log.Fatal(4, "failed to create kafka-mdm output. %s", err) } - generateMetrics(num) - run(out) + return NewFakeMetrics(generateMetrics(num), out, stats) } -func run(out out.Out) { +func (f *FakeMetrics) Close() error { + if f.closed { + return nil + } + f.close <- struct{}{} + return f.o.Close() +} + +func (f *FakeMetrics) run() { // advantage over regular ticker: // 1) no ticks dropped // 2) ticks come asap after the start of a new second, so we can measure better how long it took to get the data ticker := clock.AlignedTick(time.Second) - for tick := range ticker { - unix := tick.Unix() - for i := range metrics { - metrics[i].Time = unix - } - err := out.Flush(metrics) - if err != nil { - panic(fmt.Sprintf("failed to send data to output: %s", err)) + for { + select { + case <-f.close: + return + case tick := <-ticker: + unix := tick.Unix() + for i := range f.metrics { + f.metrics[i].Time = unix + } + err := f.o.Flush(f.metrics) + if err != nil { + panic(fmt.Sprintf("failed to send data to output: %s", err)) + } } } } diff --git a/stacktest/fakemetrics/out/carbon/carbon.go b/stacktest/fakemetrics/out/carbon/carbon.go index fe15bfa073..f60dee5313 100644 --- a/stacktest/fakemetrics/out/carbon/carbon.go +++ b/stacktest/fakemetrics/out/carbon/carbon.go @@ -2,6 +2,7 @@ package carbon import ( "bytes" + "errors" "fmt" "net" "sync" @@ -12,11 +13,14 @@ import ( "gopkg.in/raintank/schema.v1" ) +var errClosed = errors.New("output is closed") + type Carbon struct { sync.Mutex out.OutStats - addr string - conn net.Conn + addr string + conn net.Conn + closed bool } func New(addr string, stats met.Backend) (*Carbon, error) { @@ -30,11 +34,20 @@ func New(addr string, stats met.Backend) (*Carbon, error) { out.NewStats(stats, "carbon"), addr, conn, + false, }, nil } func (n *Carbon) Close() error { - return n.conn.Close() + n.Lock() + if n.closed { + n.Unlock() + return nil + } + err := n.conn.Close() + n.closed = true + n.Unlock() + return err } func (n *Carbon) Flush(metrics []*schema.MetricData) error { @@ -42,13 +55,16 @@ func (n *Carbon) Flush(metrics []*schema.MetricData) error { n.FlushDuration.Value(0) return nil } + n.Lock() + if n.closed { + return errClosed + } preFlush := time.Now() buf := bytes.NewBufferString("") for _, m := range metrics { buf.WriteString(fmt.Sprintf("%s %f %d\n", m.Name, m.Value, m.Time)) } prePub := time.Now() - n.Lock() _, err := n.conn.Write(buf.Bytes()) n.Unlock() if err != nil { diff --git a/stacktest/tests/chaos_cluster/chaos_cluster_test.go b/stacktest/tests/chaos_cluster/chaos_cluster_test.go index f80c66562d..ef1603d151 100644 --- a/stacktest/tests/chaos_cluster/chaos_cluster_test.go +++ b/stacktest/tests/chaos_cluster/chaos_cluster_test.go @@ -20,7 +20,10 @@ import ( // TODO: cleanup when ctrl-C go test (teardown all containers) +const numPartitions = 12 + var tracker *track.Tracker +var fm *fakemetrics.FakeMetrics func TestMain(m *testing.M) { ctx, cancelFunc := context.WithCancel(context.Background()) @@ -48,6 +51,7 @@ func TestMain(m *testing.M) { } retcode := m.Run() + fm.Close() fmt.Println("stopping docker-compose stack...") cancelFunc() @@ -82,7 +86,7 @@ func TestClusterStartup(t *testing.T) { func TestClusterBaseIngestWorkload(t *testing.T) { grafana.PostAnnotation("TestClusterBaseIngestWorkload:begin") - go fakemetrics.Kafka() + fm = fakemetrics.NewKafka(numPartitions) suc6, resp := graphite.RetryGraphite("perSecond(metrictank.stats.docker-cluster.*.input.kafka-mdm.metrics_received.counter32)", "-8s", 18, func(resp graphite.Response) bool { exp := []string{ diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index b770e0f21b..3bacb453c8 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -21,6 +21,7 @@ import ( // TODO: cleanup when ctrl-C go test (teardown all containers) var tracker *track.Tracker +var fm *fakemetrics.FakeMetrics const metricsPerSecond = 1000 @@ -49,6 +50,7 @@ func TestMain(m *testing.M) { } retcode := m.Run() + fm.Close() fmt.Println("stopping docker-compose stack...") cancelFunc() @@ -82,7 +84,7 @@ func TestStartup(t *testing.T) { func TestBaseIngestWorkload(t *testing.T) { grafana.PostAnnotation("TestBaseIngestWorkload:begin") - go fakemetrics.Carbon(metricsPerSecond) + fm = fakemetrics.NewCarbon(metricsPerSecond) suc6, resp := graphite.RetryGraphite8080("perSecond(metrictank.stats.docker-env.*.input.carbon.metrics_received.counter32)", "-8s", 18, func(resp graphite.Response) bool { exp := []string{ From c6061344c940e384bde6ee32fffed5bb8c0e33db Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 12:45:01 +0100 Subject: [PATCH 09/15] do away with the contexts * context for Command -> when you call cancelFunc() it uses sigkill we need the process to be able to clean up elegantly, so we need a more gentle signal this was causing us not to be able to shut down the tests properly * no need for context in tracker either, because we can simply complete reads before waiting for command --- stacktest/docker/docker.go | 2 +- .../tests/chaos_cluster/chaos_cluster_test.go | 2 +- .../end2end_carbon/end2end_carbon_test.go | 32 +++++++++---- stacktest/track/tracker.go | 46 +++++++++++++------ 4 files changed, 55 insertions(+), 27 deletions(-) diff --git a/stacktest/docker/docker.go b/stacktest/docker/docker.go index ea1792d7af..2b77349a19 100644 --- a/stacktest/docker/docker.go +++ b/stacktest/docker/docker.go @@ -109,7 +109,7 @@ func IsolateOut(name, dur string, targets ...string) error { } // log all pumba's output - _, err = track.NewTracker(context.TODO(), cmd, false, false, "pumba-stdout", "pumba-stderr") + _, err = track.NewTracker(cmd, false, false, "pumba-stdout", "pumba-stderr") if err != nil { return err } diff --git a/stacktest/tests/chaos_cluster/chaos_cluster_test.go b/stacktest/tests/chaos_cluster/chaos_cluster_test.go index ef1603d151..8f448034eb 100644 --- a/stacktest/tests/chaos_cluster/chaos_cluster_test.go +++ b/stacktest/tests/chaos_cluster/chaos_cluster_test.go @@ -40,7 +40,7 @@ func TestMain(m *testing.M) { cmd = exec.CommandContext(ctx, docker.Path("docker/launch.sh"), "docker-chaos") cmd.Env = append(cmd.Env, "MT_CLUSTER_MIN_AVAILABLE_SHARDS=12") - tracker, err = track.NewTracker(ctx, cmd, false, false, "launch-stdout", "launch-stderr") + tracker, err = track.NewTracker(cmd, false, false, "launch-stdout", "launch-stderr") if err != nil { log.Fatal(err) } diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index 3bacb453c8..42920399bf 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -1,12 +1,11 @@ package end2end_carbon import ( - "context" "fmt" "log" "os" "os/exec" - "strings" + "syscall" "testing" "time" @@ -26,20 +25,31 @@ var fm *fakemetrics.FakeMetrics const metricsPerSecond = 1000 func TestMain(m *testing.M) { - ctx, cancelFunc := context.WithCancel(context.Background()) - fmt.Println("stopping docker-dev stack should it be running...") - cmd := exec.CommandContext(ctx, "docker-compose", "down") + cmd := exec.Command("docker-compose", "down") cmd.Dir = docker.Path("docker/docker-dev") - err := cmd.Start() + var err error + tracker, err = track.NewTracker(cmd, false, false, "compose-down-stdout", "compose-down-stderr") if err != nil { log.Fatal(err) } + err = cmd.Start() + if err != nil { + log.Fatal(err) + } + // note: even when we don't care about the output, it's best to consume it before calling cmd.Wait() + // even though the cmd.Wait docs say it will wait for stdout/stderr copying to complete + // however the docs for cmd.StdoutPipe say "it is incorrect to call Wait before all reads from the pipe have completed" + tracker.Wait() + if err := cmd.Wait(); err != nil { + log.Printf("ERROR: could not cleanly shutdown running docker-compose down command: %s", err) + os.Exit(2) + } fmt.Println("launching docker-dev stack...") - cmd = exec.CommandContext(ctx, docker.Path("docker/launch.sh"), "docker-dev") + cmd = exec.Command(docker.Path("docker/launch.sh"), "docker-dev") - tracker, err = track.NewTracker(ctx, cmd, false, false, "launch-stdout", "launch-stderr") + tracker, err = track.NewTracker(cmd, false, false, "launch-stdout", "launch-stderr") if err != nil { log.Fatal(err) } @@ -53,9 +63,11 @@ func TestMain(m *testing.M) { fm.Close() fmt.Println("stopping docker-compose stack...") - cancelFunc() + cmd.Process.Signal(syscall.SIGINT) + tracker.Wait() if err := cmd.Wait(); err != nil { - if strings.Contains(err.Error(), "signal: killed") { + // 130 means ctrl-C (interrupt) which is what we want + if err.Error() == "exit status 130" { os.Exit(retcode) } log.Printf("ERROR: could not cleanly shutdown running docker-compose command: %s", err) diff --git a/stacktest/track/tracker.go b/stacktest/track/tracker.go index dbbd930087..a5266a2941 100644 --- a/stacktest/track/tracker.go +++ b/stacktest/track/tracker.go @@ -2,17 +2,16 @@ package track import ( "bufio" - "context" "fmt" "io" "os/exec" "regexp" + "sync" ) // Tracker allows to track stdout and stderr of running commands // and wait for certain messages to appear type Tracker struct { - ctx context.Context stdout io.ReadCloser stderr io.ReadCloser stdoutChan chan string @@ -23,9 +22,10 @@ type Tracker struct { logStderr chan bool prefixStdout string prefixStderr string + wg sync.WaitGroup } -func NewTracker(ctx context.Context, cmd *exec.Cmd, logStdout, logStderr bool, prefixStdout, prefixStderr string) (*Tracker, error) { +func NewTracker(cmd *exec.Cmd, logStdout, logStderr bool, prefixStdout, prefixStderr string) (*Tracker, error) { stdout, err := cmd.StdoutPipe() if err != nil { return nil, err @@ -35,7 +35,6 @@ func NewTracker(ctx context.Context, cmd *exec.Cmd, logStdout, logStderr bool, p return nil, err } t := &Tracker{ - ctx, stdout, stderr, make(chan string), @@ -46,6 +45,7 @@ func NewTracker(ctx context.Context, cmd *exec.Cmd, logStdout, logStderr bool, p make(chan bool), prefixStdout, prefixStderr, + sync.WaitGroup{}, } if prefixStdout == "" { t.prefixStdout = "stdout:" @@ -55,6 +55,7 @@ func NewTracker(ctx context.Context, cmd *exec.Cmd, logStdout, logStderr bool, p } go t.track(t.stdout, t.stdoutChan) go t.track(t.stderr, t.stderrChan) + t.wg.Add(1) go t.manage(logStdout, logStderr) return t, nil } @@ -62,21 +63,19 @@ func NewTracker(ctx context.Context, cmd *exec.Cmd, logStdout, logStderr bool, p func (t *Tracker) track(in io.ReadCloser, out chan string) { scanner := bufio.NewScanner(in) for scanner.Scan() { - out <- scanner.Text() + read := scanner.Text() + out <- read } - if err := scanner.Err(); err != nil { - if t.ctx != nil { - select { - case <-t.ctx.Done(): - return - default: - } - } + err := scanner.Err() + if err != nil { t.errChan <- err } + close(out) } func (t *Tracker) manage(logStdout, logStderr bool) { + var doneStdout bool + var doneStderr bool var matcherCtx []MatcherCtx for { select { @@ -86,7 +85,11 @@ func (t *Tracker) manage(logStdout, logStderr bool) { logStderr = t case m := <-t.newMatcherCtx: matcherCtx = append(matcherCtx, m) - case str := <-t.stdoutChan: + case str, ok := <-t.stdoutChan: + if !ok { + doneStdout = true + break + } if logStdout { fmt.Println(t.prefixStdout, str) } @@ -97,7 +100,11 @@ func (t *Tracker) manage(logStdout, logStderr bool) { } } matcherCtx = tmp - case str := <-t.stderrChan: + case str, ok := <-t.stderrChan: + if !ok { + doneStderr = true + break + } if logStderr { fmt.Println(t.prefixStderr, str) } @@ -111,6 +118,10 @@ func (t *Tracker) manage(logStdout, logStderr bool) { case err := <-t.errChan: panic(err) } + if doneStdout && doneStderr { + t.wg.Done() + return + } } } @@ -170,3 +181,8 @@ func (t *Tracker) LogStdout(b bool) { func (t *Tracker) LogStderr(b bool) { t.logStderr <- b } + +// Wait waits until stdout and stdin are closed +func (t *Tracker) Wait() { + t.wg.Wait() +} From aece05290e27163e3810a43bc0c36b3a55734615 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 12:47:29 +0100 Subject: [PATCH 10/15] typo --- stacktest/tests/end2end_carbon/end2end_carbon_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index 42920399bf..9eac0b76cb 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -110,6 +110,6 @@ func TestBaseIngestWorkload(t *testing.T) { }) if !suc6 { grafana.PostAnnotation("TestBaseIngestWorkload:FAIL") - t.Fatalf("cluster did not reach a state where the MT instance receives 4 points per second. last response was: %s", spew.Sdump(resp)) + t.Fatalf("cluster did not reach a state where the MT instance processes at least %d points per second. last response was: %s", metricsPerSecond, spew.Sdump(resp)) } } From 22bc5f7c40bcf788b94c7e99655bcd06b06433be Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 12:55:55 +0100 Subject: [PATCH 11/15] no need to run `docker-compose down` since launch.sh already does it --- .../end2end_carbon/end2end_carbon_test.go | 27 ++++--------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index 9eac0b76cb..3b9f11c318 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -25,29 +25,9 @@ var fm *fakemetrics.FakeMetrics const metricsPerSecond = 1000 func TestMain(m *testing.M) { - fmt.Println("stopping docker-dev stack should it be running...") - cmd := exec.Command("docker-compose", "down") - cmd.Dir = docker.Path("docker/docker-dev") - var err error - tracker, err = track.NewTracker(cmd, false, false, "compose-down-stdout", "compose-down-stderr") - if err != nil { - log.Fatal(err) - } - err = cmd.Start() - if err != nil { - log.Fatal(err) - } - // note: even when we don't care about the output, it's best to consume it before calling cmd.Wait() - // even though the cmd.Wait docs say it will wait for stdout/stderr copying to complete - // however the docs for cmd.StdoutPipe say "it is incorrect to call Wait before all reads from the pipe have completed" - tracker.Wait() - if err := cmd.Wait(); err != nil { - log.Printf("ERROR: could not cleanly shutdown running docker-compose down command: %s", err) - os.Exit(2) - } - fmt.Println("launching docker-dev stack...") - cmd = exec.Command(docker.Path("docker/launch.sh"), "docker-dev") + cmd := exec.Command(docker.Path("docker/launch.sh"), "docker-dev") + var err error tracker, err = track.NewTracker(cmd, false, false, "launch-stdout", "launch-stderr") if err != nil { @@ -64,6 +44,9 @@ func TestMain(m *testing.M) { fmt.Println("stopping docker-compose stack...") cmd.Process.Signal(syscall.SIGINT) + // note: even when we don't care about the output, it's best to consume it before calling cmd.Wait() + // even though the cmd.Wait docs say it will wait for stdout/stderr copying to complete + // however the docs for cmd.StdoutPipe say "it is incorrect to call Wait before all reads from the pipe have completed" tracker.Wait() if err := cmd.Wait(); err != nil { // 130 means ctrl-C (interrupt) which is what we want From f758c4df24192a9e668acc6dee1d1e4cdad8f6db Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 13:04:21 +0100 Subject: [PATCH 12/15] gofmt --- stacktest/fakemetrics/out/kafkamdm/kafkamdm.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stacktest/fakemetrics/out/kafkamdm/kafkamdm.go b/stacktest/fakemetrics/out/kafkamdm/kafkamdm.go index 844895765b..e17619a05d 100644 --- a/stacktest/fakemetrics/out/kafkamdm/kafkamdm.go +++ b/stacktest/fakemetrics/out/kafkamdm/kafkamdm.go @@ -9,8 +9,8 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/grafana/metrictank/stacktest/fakemetrics/out" p "github.com/grafana/metrictank/cluster/partitioner" + "github.com/grafana/metrictank/stacktest/fakemetrics/out" "github.com/raintank/met" "github.com/raintank/worldping-api/pkg/log" "gopkg.in/raintank/schema.v1" From a14972e3d4354f7462f86edc61b73a821f4ca38c Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 13:08:37 +0100 Subject: [PATCH 13/15] bump thresholds for CircleCI --- stacktest/tests/end2end_carbon/end2end_carbon_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index 3b9f11c318..f4de09a20f 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -70,7 +70,7 @@ func TestStartup(t *testing.T) { case <-tracker.Match(matchers): fmt.Println("stack now running.") fmt.Println("Go to http://localhost:3000 (and login as admin:admin) to see what's going on") - case <-time.After(time.Second * 40): + case <-time.After(time.Second * 70): grafana.PostAnnotation("TestStartup:FAIL") t.Fatal("timed out while waiting for all metrictank instances to come up") } From 522420b70fe5511eaef85502f638bc5647e3878f Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 24 Jan 2018 13:46:25 +0100 Subject: [PATCH 14/15] better logging so you can see timing info everywhere needed --- .../end2end_carbon/end2end_carbon_test.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index f4de09a20f..792ab666b1 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -1,7 +1,6 @@ package end2end_carbon import ( - "fmt" "log" "os" "os/exec" @@ -25,7 +24,7 @@ var fm *fakemetrics.FakeMetrics const metricsPerSecond = 1000 func TestMain(m *testing.M) { - fmt.Println("launching docker-dev stack...") + log.Println("launching docker-dev stack...") cmd := exec.Command(docker.Path("docker/launch.sh"), "docker-dev") var err error @@ -42,19 +41,20 @@ func TestMain(m *testing.M) { retcode := m.Run() fm.Close() - fmt.Println("stopping docker-compose stack...") + log.Println("stopping docker-compose stack...") cmd.Process.Signal(syscall.SIGINT) // note: even when we don't care about the output, it's best to consume it before calling cmd.Wait() // even though the cmd.Wait docs say it will wait for stdout/stderr copying to complete // however the docs for cmd.StdoutPipe say "it is incorrect to call Wait before all reads from the pipe have completed" tracker.Wait() - if err := cmd.Wait(); err != nil { - // 130 means ctrl-C (interrupt) which is what we want - if err.Error() == "exit status 130" { - os.Exit(retcode) - } + err = cmd.Wait() + + // 130 means ctrl-C (interrupt) which is what we want + if err != nil && err.Error() != "exit status 130" { log.Printf("ERROR: could not cleanly shutdown running docker-compose command: %s", err) retcode = 1 + } else { + log.Println("docker-compose stack is shut down") } os.Exit(retcode) @@ -68,8 +68,8 @@ func TestStartup(t *testing.T) { } select { case <-tracker.Match(matchers): - fmt.Println("stack now running.") - fmt.Println("Go to http://localhost:3000 (and login as admin:admin) to see what's going on") + log.Println("stack now running.") + log.Println("Go to http://localhost:3000 (and login as admin:admin) to see what's going on") case <-time.After(time.Second * 70): grafana.PostAnnotation("TestStartup:FAIL") t.Fatal("timed out while waiting for all metrictank instances to come up") From 337ff41aaea33cd73adc153c16017386c765e846 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Wed, 11 Apr 2018 10:04:37 +0300 Subject: [PATCH 15/15] update metric name --- stacktest/tests/end2end_carbon/end2end_carbon_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stacktest/tests/end2end_carbon/end2end_carbon_test.go b/stacktest/tests/end2end_carbon/end2end_carbon_test.go index 792ab666b1..b82c21efb1 100644 --- a/stacktest/tests/end2end_carbon/end2end_carbon_test.go +++ b/stacktest/tests/end2end_carbon/end2end_carbon_test.go @@ -81,9 +81,9 @@ func TestBaseIngestWorkload(t *testing.T) { fm = fakemetrics.NewCarbon(metricsPerSecond) - suc6, resp := graphite.RetryGraphite8080("perSecond(metrictank.stats.docker-env.*.input.carbon.metrics_received.counter32)", "-8s", 18, func(resp graphite.Response) bool { + suc6, resp := graphite.RetryGraphite8080("perSecond(metrictank.stats.docker-env.*.input.carbon.metricdata.received.counter32)", "-8s", 18, func(resp graphite.Response) bool { exp := []string{ - "perSecond(metrictank.stats.docker-env.default.input.carbon.metrics_received.counter32)", + "perSecond(metrictank.stats.docker-env.default.input.carbon.metricdata.received.counter32)", } a := graphite.ValidateTargets(exp)(resp) b := graphite.ValidatorLenNulls(1, 8)(resp)