From 371a8ab1b6e18bbcdc9cc0e6970aca539f9ae1c4 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Jun 2023 10:37:18 +0200 Subject: [PATCH 01/23] Initialize Metricstore --- cmd/benchmark.go | 45 +++++++++++++++++-- .../benchrunner/runners/system/metrics.go | 13 +++++- .../benchrunner/runners/system/options.go | 24 ++++++---- internal/benchrunner/runners/system/runner.go | 1 + internal/elasticsearch/client.go | 14 ++++++ 5 files changed, 84 insertions(+), 13 deletions(-) diff --git a/cmd/benchmark.go b/cmd/benchmark.go index ce13fe53cf..622ad4abe0 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -5,8 +5,10 @@ package cmd import ( + "context" "errors" "fmt" + "os" "strings" "time" @@ -14,6 +16,7 @@ import ( "github.com/elastic/elastic-package/internal/corpusgenerator" "github.com/elastic/elastic-package/internal/kibana" + "github.com/elastic/elastic-package/internal/logger" "github.com/spf13/cobra" @@ -264,7 +267,7 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("can't create Kibana client: %w", err) } - opts := system.NewOptions( + withOpts := []system.OptionFunc{ system.WithBenchmarkName(benchName), system.WithDeferCleanup(deferCleanup), system.WithMetricsInterval(metricsInterval), @@ -272,8 +275,17 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { system.WithPackageRootPath(packageRootPath), system.WithESAPI(esClient.API), system.WithKibanaClient(kc), - ) - runner := system.NewSystemBenchmark(opts) + } + + esMetricsClient, err := initializeESMetricsClient(cmd.Context()) + if err != nil { + return fmt.Errorf("can't create Elasticsearch metrics client: %w", err) + } + if esMetricsClient != nil { + withOpts = append(withOpts, system.WithESMetricsAPI(esMetricsClient.API)) + } + + runner := system.NewSystemBenchmark(system.NewOptions(withOpts...)) r, err := benchrunner.Run(runner) if err != nil { @@ -367,3 +379,30 @@ func generateDataStreamCorpusCommandAction(cmd *cobra.Command, _ []string) error return nil } + +func initializeESMetricsClient(ctx context.Context) (*elasticsearch.Client, error) { + address := os.Getenv(system.ESMetricstoreHostEnv) + user := os.Getenv(system.ESMetricstoreUsernameEnv) + pass := os.Getenv(system.ESMetricstorePasswordEnv) + cacert := os.Getenv(system.ESMetricstoreCACertificateEnv) + if address == "" || user == "" || pass == "" { + logger.Debugf("can't initialize metricstore, missing environment configuration") + return nil, nil + } + + esClient, err := elasticsearch.NewClient( + elasticsearch.OptionWithAddress(address), + elasticsearch.OptionWithUsername(user), + elasticsearch.OptionWithPassword(pass), + elasticsearch.OptionWithCertificateAuthority(cacert), + ) + if err != nil { + return nil, err + } + + if err := esClient.CheckHealth(ctx); err != nil { + return nil, err + } + + return esClient, nil +} diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 40edae2c1e..85a867044e 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -11,15 +11,24 @@ import ( "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/elasticsearch/ingest" + "github.com/elastic/elastic-package/internal/environment" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/signal" ) +var ( + ESMetricstoreHostEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_HOST") + ESMetricstoreUsernameEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_USERNAME") + ESMetricstorePasswordEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_PASSWORD") + ESMetricstoreCACertificateEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_CA_CERT") +) + type collector struct { ctxt servicedeployer.ServiceContext warmupD time.Duration interval time.Duration esapi *elasticsearch.API + msapi *elasticsearch.API datastream string pipelinePrefix string @@ -54,8 +63,7 @@ type metricsSummary struct { func newCollector( ctxt servicedeployer.ServiceContext, - esapi *elasticsearch.API, - interval, warmup time.Duration, + esapi, msapi *elasticsearch.API, datastream, pipelinePrefix string, ) *collector { return &collector{ @@ -63,6 +71,7 @@ func newCollector( interval: interval, warmupD: warmup, esapi: esapi, + msapi: msapi, datastream: datastream, pipelinePrefix: pipelinePrefix, stopC: make(chan struct{}, 1), diff --git a/internal/benchrunner/runners/system/options.go b/internal/benchrunner/runners/system/options.go index afdb22e470..a8be2fa957 100644 --- a/internal/benchrunner/runners/system/options.go +++ b/internal/benchrunner/runners/system/options.go @@ -13,14 +13,14 @@ import ( // Options contains benchmark runner options. type Options struct { - ESAPI *elasticsearch.API - KibanaClient *kibana.Client - DeferCleanup time.Duration - MetricsInterval time.Duration - ReindexData bool - MetricstoreESURL string - BenchName string - PackageRootPath string + ESAPI *elasticsearch.API + KibanaClient *kibana.Client + DeferCleanup time.Duration + MetricsInterval time.Duration + ReindexData bool + ESMetricsAPI *elasticsearch.API + BenchName string + PackageRootPath string } type OptionFunc func(*Options) @@ -62,13 +62,21 @@ func WithDeferCleanup(d time.Duration) OptionFunc { opts.DeferCleanup = d } } + func WithMetricsInterval(d time.Duration) OptionFunc { return func(opts *Options) { opts.MetricsInterval = d } } + func WithDataReindexing(b bool) OptionFunc { return func(opts *Options) { opts.ReindexData = b } } + +func WithESMetricsAPI(api *elasticsearch.API) OptionFunc { + return func(opts *Options) { + opts.ESMetricsAPI = api + } +} diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 9f0f00a3ef..4e1a7fce52 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -296,6 +296,7 @@ func (r *runner) startMetricsColletion() { r.mcollector = newCollector( r.ctxt, r.options.ESAPI, + r.options.ESMetricsAPI, r.options.MetricsInterval, r.scenario.WarmupTimePeriod, r.runtimeDataStream, diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index 4b6a8ae070..9bec94ca29 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -66,6 +66,20 @@ func OptionWithAddress(address string) ClientOption { } } +// OptionWithUsername sets the username to be used by the client. +func OptionWithUsername(username string) ClientOption { + return func(opts *clientOptions) { + opts.username = username + } +} + +// OptionWithPassword sets the password to be used by the client. +func OptionWithPassword(password string) ClientOption { + return func(opts *clientOptions) { + opts.password = password + } +} + // OptionWithCertificateAuthority sets the certificate authority to be used by the client. func OptionWithCertificateAuthority(certificateAuthority string) ClientOption { return func(opts *clientOptions) { From b5189b589797a19e6776f9edc2764e6c04d1380a Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Jun 2023 10:42:24 +0200 Subject: [PATCH 02/23] Fix package policy Var placement --- internal/benchrunner/runners/system/runner.go | 2 +- internal/kibana/policies.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 4e1a7fce52..3993ff302f 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -379,11 +379,11 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k pp := kibana.PackagePolicy{ Namespace: "ep", PolicyID: p.ID, - Vars: r.scenario.Vars, Force: true, Inputs: map[string]kibana.PackagePolicyInput{ fmt.Sprintf("%s-%s", pkgManifest.PolicyTemplates[0].Name, r.scenario.Input): { Enabled: true, + Vars: r.scenario.Vars, Streams: map[string]kibana.PackagePolicyStream{ fmt.Sprintf("%s.%s", pkgManifest.Name, r.scenario.DataStream.Name): { Enabled: true, diff --git a/internal/kibana/policies.go b/internal/kibana/policies.go index da0fd52ab5..8ed3c37029 100644 --- a/internal/kibana/policies.go +++ b/internal/kibana/policies.go @@ -227,13 +227,13 @@ type PackagePolicy struct { Name string `json:"name"` Version string `json:"version"` } `json:"package"` - Vars map[string]interface{} `json:"vars,omitempty"` Inputs map[string]PackagePolicyInput `json:"inputs,omitempty"` Force bool `json:"force"` } type PackagePolicyInput struct { Enabled bool `json:"enabled"` + Vars map[string]interface{} `json:"vars,omitempty"` Streams map[string]PackagePolicyStream `json:"streams,omitempty"` } From 19268a36ca37b150fcf01a7444e9cf095061068d Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Jun 2023 10:45:23 +0200 Subject: [PATCH 03/23] Add reindex functionality --- docs/howto/system_benchmarking.md | 2 +- internal/benchrunner/runners/system/runner.go | 157 +++++++++++++++++- 2 files changed, 157 insertions(+), 2 deletions(-) diff --git a/docs/howto/system_benchmarking.md b/docs/howto/system_benchmarking.md index 2db242ad59..8d453d3686 100644 --- a/docs/howto/system_benchmarking.md +++ b/docs/howto/system_benchmarking.md @@ -21,7 +21,7 @@ Conceptually, running a system benchmark involves the following steps: the benchmark will continue until the number of documents is not changed in the data stream. 1. Metrics collection ends and a summary report is created. 1. Delete test artifacts and tear down the instance of the package's integration service. -1. **TODO**: Optionally reindex all ingested data into the ES Metricstore for further analysis. +1. Optionally reindex all ingested data into the ES Metricstore for further analysis. 1. **TODO**: Optionally compare results against another benchmark run. ## Defining a system benchmark scenario diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 3993ff302f..022d8d78ce 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -285,7 +285,9 @@ func (r *runner) run() (report reporters.Reportable, err error) { return nil, fmt.Errorf("can't summarize metrics: %w", err) } - // TODO reindex if configured and es metricstore is set + if err := r.reindexData(); err != nil { + return nil, fmt.Errorf("can't reindex data: %w", err) + } return createReport(r.options.BenchName, r.corporaFile, r.scenario, msum) } @@ -692,6 +694,159 @@ func (r *runner) enrollAgents() error { return nil } +func (r *runner) reindexData() error { + if !r.options.ReindexData { + return nil + } + if r.options.ESMetricsAPI == nil { + return errors.New("the option to reindex data is set, but the metricstore was not initialized") + } + + logger.Debug("starting reindexing of data...") + + logger.Debug("gettings orignal mappings...") + // Get the mapping from the source data stream + mappingRes, err := r.options.ESAPI.Indices.GetMapping( + r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), + ) + if err != nil { + return fmt.Errorf("error getting mapping: %w", err) + } + defer mappingRes.Body.Close() + + body, err := io.ReadAll(mappingRes.Body) + if err != nil { + return fmt.Errorf("error reading mapping body: %w", err) + } + + mappings := map[string]struct { + Mappings json.RawMessage + }{} + + if err := json.Unmarshal(body, &mappings); err != nil { + return fmt.Errorf("error unmarshaling mappings: %w", err) + } + + if len(mappings) != 1 { + return fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappings)) + } + + var mapping string + for _, v := range mappings { + mapping = string(v.Mappings) + } + + reader := bytes.NewReader( + []byte(fmt.Sprintf(`{ + "settings": {"number_of_replicas":0}, + "mappings": %s + }`, mapping)), + ) + + indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.ctxt.Bench.RunID) + + logger.Debugf("creating %s index in metricstore...", indexName) + + createRes, err := r.options.ESMetricsAPI.Indices.Create( + indexName, + r.options.ESMetricsAPI.Indices.Create.WithBody(reader), + ) + if err != nil { + return fmt.Errorf("could not create index: %w", err) + } + defer createRes.Body.Close() + + if createRes.IsError() { + return errors.New("got a response error while creating index") + } + + bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) + + logger.Debug("starting scrolling of events...") + res, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), + r.options.ESAPI.Search.WithBody(bodyReader), + r.options.ESAPI.Search.WithScroll(time.Minute), + r.options.ESAPI.Search.WithSize(10000), + ) + if err != nil { + return fmt.Errorf("error executing search: %w", err) + } + defer res.Body.Close() + + // Iterate through the search results using the Scroll API + for { + var sr map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { + return fmt.Errorf("error decoding search response: %w", err) + } + + resErr, found := sr["error"] + if found { + errStr := resErr.(map[string]interface{})["reason"].(string) + return fmt.Errorf("error searching for documents: %s", errStr) + } + + hits, found := sr["hits"].(map[string]interface{})["hits"].([]interface{}) + if !found || len(hits) == 0 { + break + } + + var bulkBodyBuilder strings.Builder + for _, hit := range hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.(map[string]interface{})["_id"])) + enriched := r.enrichEventWithBenchmarkMetadata(hit.(map[string]interface{})["_source"].(map[string]interface{})) + src, err := json.Marshal(enriched) + if err != nil { + return fmt.Errorf("error decoding _source: %w", err) + } + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } + + logger.Debugf("bulk request of %d events...", len(hits)) + + bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + bulkRes.Body.Close() + + scrollId, found := sr["_scroll_id"].(string) + if !found { + return errors.New("error getting scroll ID") + } + + res, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(scrollId), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + defer res.Body.Close() + } + + logger.Debug("reindexing operation finished") + return nil +} + +type benchMeta struct { + Info struct { + Benchmark string `json:"benchmark"` + RunID string `json:"run_id"` + } `json:"info"` + Parameters scenario `json:"parameter"` +} + +func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[string]interface{} { + var m benchMeta + m.Info.Benchmark = r.options.BenchName + m.Info.RunID = r.ctxt.Bench.RunID + m.Parameters = *r.scenario + e["benchmark_metadata"] = m + return e +} + func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { resp, err := esapi.Count( esapi.Count.WithIndex(dataStream), From 2e88cf51c925b74d43a5e73066f6617bf9460787 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Jun 2023 10:47:19 +0200 Subject: [PATCH 04/23] Refactor waitfordatatimeout usage --- .../benchrunner/runners/system/metrics.go | 21 +++++++++++++------ internal/benchrunner/runners/system/report.go | 2 +- internal/benchrunner/runners/system/runner.go | 13 +++--------- .../benchrunner/runners/system/scenario.go | 5 ++++- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 85a867044e..8b4fe750f1 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -24,8 +24,10 @@ var ( ) type collector struct { - ctxt servicedeployer.ServiceContext - warmupD time.Duration + ctxt servicedeployer.ServiceContext + metadata benchMeta + scenario scenario + interval time.Duration esapi *elasticsearch.API msapi *elasticsearch.API @@ -63,13 +65,20 @@ type metricsSummary struct { func newCollector( ctxt servicedeployer.ServiceContext, + benchName string, + scenario scenario, esapi, msapi *elasticsearch.API, + interval time.Duration, datastream, pipelinePrefix string, ) *collector { + meta := benchMeta{Parameters: scenario} + meta.Info.Benchmark = benchName + meta.Info.RunID = ctxt.Bench.RunID return &collector{ ctxt: ctxt, interval: interval, - warmupD: warmup, + scenario: scenario, + metadata: meta, esapi: esapi, msapi: msapi, datastream: datastream, @@ -217,9 +226,9 @@ readyLoop: } } - if c.warmupD > 0 { - logger.Debugf("waiting %s for warmup period", c.warmupD) - <-time.After(c.warmupD) + if c.scenario.WarmupTimePeriod > 0 { + logger.Debugf("waiting %s for warmup period", c.scenario.WarmupTimePeriod) + <-time.After(c.scenario.WarmupTimePeriod) } logger.Debug("metric collection starting...") } diff --git a/internal/benchrunner/runners/system/report.go b/internal/benchrunner/runners/system/report.go index 5aa5430a21..ac27ca0817 100644 --- a/internal/benchrunner/runners/system/report.go +++ b/internal/benchrunner/runners/system/report.go @@ -78,7 +78,7 @@ func newReport(benchName, corporaFile string, s *scenario, sum *metricsSummary) report.Parameters.DataStream = s.DataStream report.Parameters.WarmupTimePeriod = s.WarmupTimePeriod report.Parameters.BenchmarkTimePeriod = s.BenchmarkTimePeriod - report.Parameters.WaitForDataTimeout = s.WaitForDataTimeout + report.Parameters.WaitForDataTimeout = *s.WaitForDataTimeout report.Parameters.Corpora = s.Corpora report.ClusterName = sum.ClusterName report.Nodes = sum.Nodes diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 022d8d78ce..8c8b2ae75f 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -41,10 +41,6 @@ const ( // are stored on the Agent container's filesystem. ServiceLogsAgentDir = "/tmp/service_logs" - waitForDataDefaultTimeout = 10 * time.Minute -) - -const ( // BenchType defining system benchmark BenchType benchrunner.Type = "system" ) @@ -297,10 +293,11 @@ func (r *runner) startMetricsColletion() { // TODO collect agent hosts metrics using system integration r.mcollector = newCollector( r.ctxt, + r.options.BenchName, + *r.scenario, r.options.ESAPI, r.options.ESMetricsAPI, r.options.MetricsInterval, - r.scenario.WarmupTimePeriod, r.runtimeDataStream, r.pipelinePrefix, ) @@ -609,10 +606,6 @@ func (r *runner) waitUntilBenchmarkFinishes() error { if r.scenario.BenchmarkTimePeriod > 0 { benchTime = time.NewTimer(r.scenario.BenchmarkTimePeriod) } - waitForDataTimeout := waitForDataDefaultTimeout - if r.scenario.WaitForDataTimeout > 0 { - waitForDataTimeout = r.scenario.WaitForDataTimeout - } oldHits := 0 _, err := waitUntilTrue(func() (bool, error) { @@ -641,7 +634,7 @@ func (r *runner) waitUntilBenchmarkFinishes() error { } return ret, err - }, waitForDataTimeout) + }, *r.scenario.WaitForDataTimeout) return err } diff --git a/internal/benchrunner/runners/system/scenario.go b/internal/benchrunner/runners/system/scenario.go index be10101701..3227ed5467 100644 --- a/internal/benchrunner/runners/system/scenario.go +++ b/internal/benchrunner/runners/system/scenario.go @@ -72,7 +72,10 @@ type corporaFields struct { } func defaultConfig() *scenario { - return &scenario{} + timeout := 10 * time.Minute + return &scenario{ + WaitForDataTimeout: &timeout, + } } func readConfig(path, scenario string, ctxt servicedeployer.ServiceContext) (*scenario, error) { From 8780b35e909b9d8db545c886b0e5d4488ed91eb7 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 27 Jun 2023 11:03:45 +0200 Subject: [PATCH 05/23] Fix filter agent policy id --- internal/benchrunner/runners/system/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 8c8b2ae75f..a2e25e2c8a 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -886,7 +886,7 @@ func filterAgents(allAgents []kibana.Agent) []kibana.Agent { switch { case agent.LocalMetadata.Host.Name == "docker-fleet-server", agent.PolicyID == "fleet-server-policy", - agent.PolicyID == "Elastic Cloud agent policy": + agent.PolicyID == "policy-elastic-agent-on-cloud": continue } filtered = append(filtered, agent) From 40cbb92e98bdd75b95bb98a0e96a5ae6b1a5a1ab Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 29 Jun 2023 08:54:01 +0200 Subject: [PATCH 06/23] Improve report --- internal/benchrunner/runners/system/report.go | 30 +++++++---- .../benchrunner/runners/system/scenario.go | 54 +++++++++---------- internal/elasticsearch/ingest/diskusage.go | 27 ++++------ 3 files changed, 57 insertions(+), 54 deletions(-) diff --git a/internal/benchrunner/runners/system/report.go b/internal/benchrunner/runners/system/report.go index ac27ca0817..4d6d57016a 100644 --- a/internal/benchrunner/runners/system/report.go +++ b/internal/benchrunner/runners/system/report.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/dustin/go-humanize" "github.com/jedib0t/go-pretty/table" "github.com/jedib0t/go-pretty/text" @@ -173,20 +174,31 @@ func reportHumanFormat(r *report) []byte { adu := du.AllFields report.WriteString(renderBenchmarkTable( fmt.Sprintf("disk usage for index %s (for all fields)", index), - "total", adu.Total, - "inverted_index.total", adu.InvertedIndex.Total, - "inverted_index.stored_fields", adu.StoredFields, - "inverted_index.doc_values", adu.DocValues, - "inverted_index.points", adu.Points, - "inverted_index.norms", adu.Norms, - "inverted_index.term_vectors", adu.TermVectors, - "inverted_index.knn_vectors", adu.KnnVectors, + "total", humanize.Bytes(adu.TotalInBytes), + "inverted_index.total", humanize.Bytes(adu.InvertedIndex.TotalInBytes), + "inverted_index.stored_fields", humanize.Bytes(adu.StoredFieldsInBytes), + "inverted_index.doc_values", humanize.Bytes(adu.DocValuesInBytes), + "inverted_index.points", humanize.Bytes(adu.PointsInBytes), + "inverted_index.norms", humanize.Bytes(adu.NormsInBytes), + "inverted_index.term_vectors", humanize.Bytes(adu.TermVectorsInBytes), + "inverted_index.knn_vectors", humanize.Bytes(adu.KnnVectorsInBytes), ) + "\n") } for node, pStats := range r.IngestPipelineStats { for pipeline, stats := range pStats { - var kvs []interface{} + if stats.Count == 0 { + continue + } + kvs := []interface{}{ + "Totals", + fmt.Sprintf( + "Count: %d | Failed: %d | Time: %s", + stats.Count, + stats.Failed, + time.Duration(stats.TimeInMillis)*time.Millisecond, + ), + } for _, procStats := range stats.Processors { str := fmt.Sprintf( "Count: %d | Failed: %d | Time: %s", diff --git a/internal/benchrunner/runners/system/scenario.go b/internal/benchrunner/runners/system/scenario.go index 3227ed5467..a81af67bf6 100644 --- a/internal/benchrunner/runners/system/scenario.go +++ b/internal/benchrunner/runners/system/scenario.go @@ -21,54 +21,54 @@ import ( const devPath = "_dev/benchmark/system" type scenario struct { - Package string `config:"package"` - Description string `config:"description"` - Version string `config:"version"` - Input string `config:"input"` - Vars map[string]interface{} `config:"vars"` - DataStream dataStream `config:"data_stream"` - WarmupTimePeriod time.Duration `config:"warmup_time_period"` - BenchmarkTimePeriod time.Duration `config:"benchmark_time_period"` - WaitForDataTimeout time.Duration `config:"wait_for_data_timeout"` - Corpora corpora `config:"corpora"` + Package string `config:"package" json:"package"` + Description string `config:"description" json:"description"` + Version string `config:"version" json:"version"` + Input string `config:"input" json:"input"` + Vars map[string]interface{} `config:"vars" json:"vars"` + DataStream dataStream `config:"data_stream" json:"data_stream"` + WarmupTimePeriod time.Duration `config:"warmup_time_period" json:"warmup_time_period"` + BenchmarkTimePeriod time.Duration `config:"benchmark_time_period" json:"benchmark_time_period"` + WaitForDataTimeout *time.Duration `config:"wait_for_data_timeout" json:"wait_for_data_timeout"` + Corpora corpora `config:"corpora" json:"corpora"` } type dataStream struct { - Name string `config:"name"` - Vars map[string]interface{} `config:"vars"` + Name string `config:"name" json:"name"` + Vars map[string]interface{} `config:"vars" json:"vars"` } type corpora struct { - Generator *generator `config:"generator"` - InputService *inputService `config:"input_service"` + Generator *generator `config:"generator" json:"generator"` + InputService *inputService `config:"input_service" json:"input_service"` } type inputService struct { - Name string `config:"name"` - Signal string `config:"signal"` + Name string `config:"name" json:"name"` + Signal string `config:"signal" json:"signal"` } type generator struct { - Size string `config:"size"` - Template corporaTemplate `config:"template"` - Config corporaConfig `config:"config"` - Fields corporaFields `config:"fields"` + Size string `config:"size" json:"size"` + Template corporaTemplate `config:"template" json:"template"` + Config corporaConfig `config:"config" json:"config"` + Fields corporaFields `config:"fields" json:"fields"` } type corporaTemplate struct { - Raw string `config:"raw"` - Path string `config:"path"` - Type string `config:"type"` + Raw string `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` + Type string `config:"type" json:"type"` } type corporaConfig struct { - Raw map[string]interface{} `config:"raw"` - Path string `config:"path"` + Raw map[string]interface{} `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` } type corporaFields struct { - Raw map[string]interface{} `config:"raw"` - Path string `config:"path"` + Raw map[string]interface{} `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` } func defaultConfig() *scenario { diff --git a/internal/elasticsearch/ingest/diskusage.go b/internal/elasticsearch/ingest/diskusage.go index b6be4f644d..7c8f334b1a 100644 --- a/internal/elasticsearch/ingest/diskusage.go +++ b/internal/elasticsearch/ingest/diskusage.go @@ -13,29 +13,20 @@ import ( ) type DiskUsageStat struct { - Total string `json:"total"` - TotalInBytes int `json:"total_in_bytes"` + TotalInBytes uint64 `json:"total_in_bytes"` InvertedIndex struct { - Total string `json:"total"` - TotalInBytes int `json:"total_in_bytes"` + TotalInBytes uint64 `json:"total_in_bytes"` } `json:"inverted_index"` - StoredFields string `json:"stored_fields"` - StoredFieldsInBytes int `json:"stored_fields_in_bytes"` - DocValues string `json:"doc_values"` - DocValuesInBytes int `json:"doc_values_in_bytes"` - Points string `json:"points"` - PointsInBytes int `json:"points_in_bytes"` - Norms string `json:"norms"` - NormsInBytes int `json:"norms_in_bytes"` - TermVectors string `json:"term_vectors"` - TermVectorsInBytes int `json:"term_vectors_in_bytes"` - KnnVectors string `json:"knn_vectors"` - KnnVectorsInBytes int `json:"knn_vectors_in_bytes"` + StoredFieldsInBytes uint64 `json:"stored_fields_in_bytes"` + DocValuesInBytes uint64 `json:"doc_values_in_bytes"` + PointsInBytes uint64 `json:"points_in_bytes"` + NormsInBytes uint64 `json:"norms_in_bytes"` + TermVectorsInBytes uint64 `json:"term_vectors_in_bytes"` + KnnVectorsInBytes uint64 `json:"knn_vectors_in_bytes"` } type DiskUsage struct { - StoreSize string `json:"store_size"` - StoreSizeInBytes int `json:"store_size_in_bytes"` + StoreSizeInBytes uint64 `json:"store_size_in_bytes"` AllFields DiskUsageStat `json:"all_fields"` Fields map[string]DiskUsageStat `json:"fields"` } From 71189465ec7817820aee34f622d594b93242757e Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 29 Jun 2023 08:54:48 +0200 Subject: [PATCH 07/23] Collect metrics to metricstore if configured --- docs/howto/system_benchmarking.md | 2 +- internal/benchrunner/runner.go | 19 +- .../benchrunner/runners/system/metrics.go | 162 ++++++++++++++++-- internal/benchrunner/runners/system/runner.go | 1 - 4 files changed, 160 insertions(+), 24 deletions(-) diff --git a/docs/howto/system_benchmarking.md b/docs/howto/system_benchmarking.md index 8d453d3686..df6bbe5d89 100644 --- a/docs/howto/system_benchmarking.md +++ b/docs/howto/system_benchmarking.md @@ -13,7 +13,7 @@ Conceptually, running a system benchmark involves the following steps: 1. Create a benchmark policy that configures a single data stream for a single package. 1. Assign the policy to the enrolled Agent(s). 1. Metrics collections from the cluster starts. (**TODO**: record metrics from all Elastic Agents involved using the `system` integration.) -1. **TODO**: Send the collected metrics to the ES Metricstore if set. +1. Send the collected metrics to the ES Metricstore if set. 1. Generate data if configured (it uses the [corpus-generator-rool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) 1. Wait a reasonable amount of time for the Agent to collect data from the integration service and index it into the correct Elasticsearch data stream. diff --git a/internal/benchrunner/runner.go b/internal/benchrunner/runner.go index 00399d14d7..30058cd009 100644 --- a/internal/benchrunner/runner.go +++ b/internal/benchrunner/runner.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/elastic/elastic-package/internal/benchrunner/reporters" + "github.com/elastic/elastic-package/internal/logger" ) // Type represents the various supported benchmark types @@ -26,20 +27,26 @@ func Run(runner Runner) (reporters.Reportable, error) { return nil, errors.New("a runner is required") } + defer func() { + // we want to ensure correct tear down of the benchmark in any situation + if rerr := recover(); rerr != nil { + logger.Errorf("panic occurred: %w", rerr) + } + + tdErr := runner.TearDown() + if tdErr != nil { + logger.Errorf("could not teardown benchmark runner: %w", tdErr) + } + }() + if err := runner.SetUp(); err != nil { return nil, fmt.Errorf("could not set up benchmark runner: %w", err) } report, err := runner.Run() - tdErr := runner.TearDown() - if err != nil { return nil, fmt.Errorf("could not complete benchmark run: %w", err) } - if tdErr != nil { - return report, fmt.Errorf("could not teardown benchmark runner: %w", tdErr) - } - return report, nil } diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 8b4fe750f1..3a5434883d 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -5,6 +5,9 @@ package system import ( + "bytes" + "encoding/json" + "fmt" "sync" "time" @@ -39,7 +42,8 @@ type collector struct { startIngestMetrics map[string]ingest.PipelineStatsMap endIngestMetrics map[string]ingest.PipelineStatsMap - collectedMetrics []metrics + startMetrics metrics + endMetrics metrics diskUsage map[string]ingest.DiskUsage startTotalHits int endTotalHits int @@ -61,6 +65,7 @@ type metricsSummary struct { IngestPipelineStats map[string]ingest.PipelineStatsMap DiskUsage map[string]ingest.DiskUsage TotalHits int + NodesStats map[string]ingest.NodeStats } func newCollector( @@ -83,26 +88,30 @@ func newCollector( msapi: msapi, datastream: datastream, pipelinePrefix: pipelinePrefix, - stopC: make(chan struct{}, 1), + stopC: make(chan struct{}), } } func (c *collector) start() { c.tick = time.NewTicker(c.interval) + c.createMetricsIndex() + var once sync.Once go func() { - var once sync.Once once.Do(c.waitUntilReady) defer c.tick.Stop() c.startIngestMetrics = c.collectIngestMetrics() c.startTotalHits = c.collectTotalHits() + c.startMetrics = c.collect() + c.publish(c.createEventsFromMetrics(c.startMetrics)) for { if signal.SIGINT() { logger.Debug("SIGINT: cancel metrics collection") c.collectMetricsPreviousToStop() + c.publish(c.createEventsFromMetrics(c.endMetrics)) return } @@ -110,10 +119,12 @@ func (c *collector) start() { case <-c.stopC: // last collect before stopping c.collectMetricsPreviousToStop() + c.publish(c.createEventsFromMetrics(c.endMetrics)) c.stopC <- struct{}{} return case <-c.tick.C: - c.collect() + m := c.collect() + c.publish(c.createEventsFromMetrics(m)) } } }() @@ -125,7 +136,7 @@ func (c *collector) stop() { close(c.stopC) } -func (c *collector) collect() { +func (c *collector) collect() metrics { m := metrics{ ts: time.Now().Unix(), } @@ -144,24 +155,99 @@ func (c *collector) collect() { m.dsMetrics = dsstats } - c.collectedMetrics = append(c.collectedMetrics, m) + return m +} + +func (c *collector) publish(events [][]byte) { + if c.msapi == nil { + return + } + for _, e := range events { + reqBody := bytes.NewReader(e) + resp, err := c.msapi.Index(c.indexName(), reqBody) + if err != nil { + logger.Debugf("error indexing metrics: %e", err) + continue + } + var sr map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { + logger.Debugf("error decoding search response: %e", err) + } + + resErr, found := sr["error"] + if found { + errStr := resErr.(map[string]interface{})["reason"].(string) + logger.Debugf("error searching for documents: %s", errStr) + } + } +} + +func (c *collector) createMetricsIndex() { + if c.msapi == nil { + return + } + reader := bytes.NewReader( + []byte(`{ + "settings": { + "number_of_replicas": 0 + }, + "mappings": { + "dynamic_templates": [ + { + "strings_as_keyword": { + "match_mapping_type": "string", + "mapping": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + ], + "date_detection": false, + "properties": { + "@timestamp": { + "type": "date" + } + } + } + }`), + ) + + logger.Debugf("creating %s index in metricstore...", c.indexName()) + + createRes, err := c.msapi.Indices.Create( + c.indexName(), + c.msapi.Indices.Create.WithBody(reader), + ) + if err != nil { + logger.Debugf("could not create index: %v", err) + return + } + createRes.Body.Close() + + if createRes.IsError() { + logger.Debug("got a response error while creating index") + } +} + +func (c *collector) indexName() string { + return fmt.Sprintf("bench-metrics-%s-%s", c.datastream, c.ctxt.Bench.RunID) } func (c *collector) summarize() (*metricsSummary, error) { sum := metricsSummary{ RunID: c.ctxt.Bench.RunID, IngestPipelineStats: make(map[string]ingest.PipelineStatsMap), + NodesStats: make(map[string]ingest.NodeStats), DiskUsage: c.diskUsage, TotalHits: c.endTotalHits - c.startTotalHits, } - if len(c.collectedMetrics) > 0 { - sum.CollectionStartTs = c.collectedMetrics[0].ts - sum.CollectionEndTs = c.collectedMetrics[len(c.collectedMetrics)-1].ts - sum.DataStreamStats = c.collectedMetrics[len(c.collectedMetrics)-1].dsMetrics - sum.ClusterName = c.collectedMetrics[0].nMetrics.ClusterName - sum.Nodes = len(c.collectedMetrics[len(c.collectedMetrics)-1].nMetrics.Nodes) - } + sum.ClusterName = c.startMetrics.nMetrics.ClusterName + sum.CollectionStartTs = c.startMetrics.ts + sum.CollectionEndTs = c.endMetrics.ts + sum.DataStreamStats = c.endMetrics.dsMetrics + sum.Nodes = len(c.endMetrics.nMetrics.Nodes) for node, endPStats := range c.endIngestMetrics { startPStats, found := c.startIngestMetrics[node] @@ -178,8 +264,9 @@ func (c *collector) summarize() (*metricsSummary, error) { } sumStats[pname] = ingest.PipelineStats{ StatsRecord: ingest.StatsRecord{ - Count: endStats.Count - startStats.Count, - Failed: endStats.TimeInMillis - startStats.TimeInMillis, + Count: endStats.Count - startStats.Count, + Failed: endStats.Failed - startStats.Failed, + TimeInMillis: endStats.TimeInMillis - startStats.TimeInMillis, }, Processors: make([]ingest.ProcessorStats, len(endStats.Processors)), } @@ -252,10 +339,10 @@ func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage { } func (c *collector) collectMetricsPreviousToStop() { - c.collect() c.endIngestMetrics = c.collectIngestMetrics() c.diskUsage = c.collectDiskUsage() c.endTotalHits = c.collectTotalHits() + c.endMetrics = c.collect() } func (c *collector) collectTotalHits() int { @@ -265,3 +352,46 @@ func (c *collector) collectTotalHits() int { } return totalHits } + +func (c *collector) createEventsFromMetrics(m metrics) [][]byte { + dsEvent := struct { + Ts int64 `json:"@timestamp"` + *ingest.DataStreamStats + Meta benchMeta `json:"benchmark_metadata"` + }{ + Ts: m.ts * 1000, // ms to s + DataStreamStats: m.dsMetrics, + Meta: c.metadata, + } + + type nEvent struct { + Ts int64 `json:"@timestamp"` + ClusterName string `json:"cluster_name"` + NodeName string `json:"node_name"` + *ingest.NodeStats + Meta benchMeta `json:"benchmark_metadata"` + } + + var nEvents []interface{} + + for node, stats := range m.nMetrics.Nodes { + nEvents = append(nEvents, nEvent{ + Ts: m.ts * 1000, // ms to s + ClusterName: m.nMetrics.ClusterName, + NodeName: node, + NodeStats: &stats, + Meta: c.metadata, + }) + } + + var events [][]byte + for _, e := range append(nEvents, dsEvent) { + b, err := json.Marshal(e) + if err != nil { + logger.Debugf("error marhsaling metrics event: %w", err) + continue + } + events = append(events, b) + } + return events +} diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index a2e25e2c8a..6f0e3eac8e 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -289,7 +289,6 @@ func (r *runner) run() (report reporters.Reportable, err error) { } func (r *runner) startMetricsColletion() { - // TODO send metrics to es metricstore if set // TODO collect agent hosts metrics using system integration r.mcollector = newCollector( r.ctxt, From 29fb96a88b8dddb21d8e15655132ec58396def76 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 29 Jun 2023 10:40:52 +0200 Subject: [PATCH 08/23] Add support for other service deployers --- cmd/benchmark.go | 7 + .../benchrunner/runners/system/metrics.go | 8 +- .../benchrunner/runners/system/options.go | 7 + internal/benchrunner/runners/system/runner.go | 25 +- .../benchrunner/runners/system/scenario.go | 2 +- .../runners/system/servicedeployer/compose.go | 215 ------------------ .../runners/system/servicedeployer/context.go | 83 ------- .../runners/system/servicedeployer/factory.go | 74 ------ internal/service/boot.go | 2 +- .../_static/Dockerfile.terraform_deployer | 0 .../_static/docker-custom-agent-base.yml | 0 .../_static/terraform_deployer.yml | 0 .../_static/terraform_deployer_run.sh | 0 .../system => }/servicedeployer/compose.go | 0 .../system => }/servicedeployer/context.go | 0 .../servicedeployer/custom_agent.go | 0 .../servicedeployer/deployed_service.go | 0 .../elastic-agent-managed.yaml.tmpl | 0 .../system => }/servicedeployer/factory.go | 27 ++- .../system => }/servicedeployer/kubernetes.go | 0 .../servicedeployer/service_deployer.go | 0 .../system => }/servicedeployer/terraform.go | 8 + .../servicedeployer/terraform_env.go | 0 .../servicedeployer/terraform_test.go | 0 .../system => }/servicedeployer/variants.go | 0 internal/testrunner/runners/system/runner.go | 15 +- .../servicedeployer/deployed_service.go | 20 -- .../servicedeployer/service_deployer.go | 13 -- .../testrunner/runners/system/test_config.go | 2 +- 29 files changed, 69 insertions(+), 439 deletions(-) delete mode 100644 internal/benchrunner/runners/system/servicedeployer/compose.go delete mode 100644 internal/benchrunner/runners/system/servicedeployer/context.go delete mode 100644 internal/benchrunner/runners/system/servicedeployer/factory.go rename internal/{testrunner/runners/system => }/servicedeployer/_static/Dockerfile.terraform_deployer (100%) rename internal/{testrunner/runners/system => }/servicedeployer/_static/docker-custom-agent-base.yml (100%) rename internal/{testrunner/runners/system => }/servicedeployer/_static/terraform_deployer.yml (100%) rename internal/{testrunner/runners/system => }/servicedeployer/_static/terraform_deployer_run.sh (100%) rename internal/{testrunner/runners/system => }/servicedeployer/compose.go (100%) rename internal/{testrunner/runners/system => }/servicedeployer/context.go (100%) rename internal/{testrunner/runners/system => }/servicedeployer/custom_agent.go (100%) rename internal/{benchrunner/runners/system => }/servicedeployer/deployed_service.go (100%) rename internal/{testrunner/runners/system => }/servicedeployer/elastic-agent-managed.yaml.tmpl (100%) rename internal/{testrunner/runners/system => }/servicedeployer/factory.go (84%) rename internal/{testrunner/runners/system => }/servicedeployer/kubernetes.go (100%) rename internal/{benchrunner/runners/system => }/servicedeployer/service_deployer.go (100%) rename internal/{testrunner/runners/system => }/servicedeployer/terraform.go (95%) rename internal/{testrunner/runners/system => }/servicedeployer/terraform_env.go (100%) rename internal/{testrunner/runners/system => }/servicedeployer/terraform_test.go (100%) rename internal/{testrunner/runners/system => }/servicedeployer/variants.go (100%) delete mode 100644 internal/testrunner/runners/system/servicedeployer/deployed_service.go delete mode 100644 internal/testrunner/runners/system/servicedeployer/service_deployer.go diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 622ad4abe0..de87d080eb 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -216,6 +216,7 @@ func getSystemCommand() *cobra.Command { cmd.Flags().BoolP(cobraext.BenchReindexToMetricstoreFlagName, "", false, cobraext.BenchReindexToMetricstoreFlagDescription) cmd.Flags().DurationP(cobraext.BenchMetricsIntervalFlagName, "", time.Second, cobraext.BenchMetricsIntervalFlagDescription) cmd.Flags().DurationP(cobraext.DeferCleanupFlagName, "", 0, cobraext.DeferCleanupFlagDescription) + cmd.Flags().String(cobraext.VariantFlagName, "", cobraext.VariantFlagDescription) return cmd } @@ -223,6 +224,11 @@ func getSystemCommand() *cobra.Command { func systemCommandAction(cmd *cobra.Command, args []string) error { cmd.Println("Run system benchmarks for the package") + variant, err := cmd.Flags().GetString(cobraext.VariantFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.VariantFlagName) + } + benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName) if err != nil { return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName) @@ -268,6 +274,7 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { } withOpts := []system.OptionFunc{ + system.WithVariant(variant), system.WithBenchmarkName(benchName), system.WithDeferCleanup(deferCleanup), system.WithMetricsInterval(metricsInterval), diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 3a5434883d..0cfe7e80ba 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -11,11 +11,11 @@ import ( "sync" "time" - "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/elasticsearch/ingest" "github.com/elastic/elastic-package/internal/environment" "github.com/elastic/elastic-package/internal/logger" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/signal" ) @@ -78,7 +78,7 @@ func newCollector( ) *collector { meta := benchMeta{Parameters: scenario} meta.Info.Benchmark = benchName - meta.Info.RunID = ctxt.Bench.RunID + meta.Info.RunID = ctxt.Test.RunID return &collector{ ctxt: ctxt, interval: interval, @@ -231,12 +231,12 @@ func (c *collector) createMetricsIndex() { } func (c *collector) indexName() string { - return fmt.Sprintf("bench-metrics-%s-%s", c.datastream, c.ctxt.Bench.RunID) + return fmt.Sprintf("bench-metrics-%s-%s", c.datastream, c.ctxt.Test.RunID) } func (c *collector) summarize() (*metricsSummary, error) { sum := metricsSummary{ - RunID: c.ctxt.Bench.RunID, + RunID: c.ctxt.Test.RunID, IngestPipelineStats: make(map[string]ingest.PipelineStatsMap), NodesStats: make(map[string]ingest.NodeStats), DiskUsage: c.diskUsage, diff --git a/internal/benchrunner/runners/system/options.go b/internal/benchrunner/runners/system/options.go index a8be2fa957..308226e3c8 100644 --- a/internal/benchrunner/runners/system/options.go +++ b/internal/benchrunner/runners/system/options.go @@ -21,6 +21,7 @@ type Options struct { ESMetricsAPI *elasticsearch.API BenchName string PackageRootPath string + Variant string } type OptionFunc func(*Options) @@ -80,3 +81,9 @@ func WithESMetricsAPI(api *elasticsearch.API) OptionFunc { opts.ESMetricsAPI = api } } + +func WithVariant(name string) OptionFunc { + return func(opts *Options) { + opts.Variant = name + } +} diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 6f0e3eac8e..f5984a5bfd 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -26,13 +26,13 @@ import ( "github.com/elastic/elastic-package/internal/benchrunner" "github.com/elastic/elastic-package/internal/benchrunner/reporters" - "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" "github.com/elastic/elastic-package/internal/configuration/locations" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/kibana" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/multierror" "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/signal" ) @@ -40,6 +40,7 @@ const ( // ServiceLogsAgentDir is folder path where log files produced by the service // are stored on the Agent container's filesystem. ServiceLogsAgentDir = "/tmp/service_logs" + devDeployDir = "_dev/benchmark/system/deploy" // BenchType defining system benchmark BenchType benchrunner.Type = "system" @@ -136,7 +137,13 @@ func (r *runner) setUp() error { serviceLogsDir := locationManager.ServiceLogDir() r.ctxt.Logs.Folder.Local = serviceLogsDir r.ctxt.Logs.Folder.Agent = ServiceLogsAgentDir - r.ctxt.Bench.RunID = createRunID() + r.ctxt.Test.RunID = createRunID() + + outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.ctxt.Test.RunID) + if err != nil { + return fmt.Errorf("could not create output dir for terraform deployer %w", err) + } + r.ctxt.OutputDir = outputDir scenario, err := readConfig(r.options.PackageRootPath, r.options.BenchName, r.ctxt) if err != nil { @@ -225,9 +232,13 @@ func (r *runner) run() (report reporters.Reportable, err error) { if r.scenario.Corpora.InputService != nil { // Setup service. logger.Debug("setting up service...") - serviceDeployer, err := servicedeployer.Factory(servicedeployer.FactoryOptions{ - RootPath: r.options.PackageRootPath, - }) + opts := servicedeployer.FactoryOptions{ + PackageRootPath: r.options.PackageRootPath, + DevDeployDir: devDeployDir, + Variant: r.options.Variant, + Type: servicedeployer.TypeBench, + } + serviceDeployer, err := servicedeployer.Factory(opts) if err != nil { return nil, fmt.Errorf("could not create service runner: %w", err) @@ -735,7 +746,7 @@ func (r *runner) reindexData() error { }`, mapping)), ) - indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.ctxt.Bench.RunID) + indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.ctxt.Test.RunID) logger.Debugf("creating %s index in metricstore...", indexName) @@ -833,7 +844,7 @@ type benchMeta struct { func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[string]interface{} { var m benchMeta m.Info.Benchmark = r.options.BenchName - m.Info.RunID = r.ctxt.Bench.RunID + m.Info.RunID = r.ctxt.Test.RunID m.Parameters = *r.scenario e["benchmark_metadata"] = m return e diff --git a/internal/benchrunner/runners/system/scenario.go b/internal/benchrunner/runners/system/scenario.go index a81af67bf6..af02ad4405 100644 --- a/internal/benchrunner/runners/system/scenario.go +++ b/internal/benchrunner/runners/system/scenario.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/go-ucfg" "github.com/elastic/go-ucfg/yaml" - "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" + "github.com/elastic/elastic-package/internal/servicedeployer" ) const devPath = "_dev/benchmark/system" diff --git a/internal/benchrunner/runners/system/servicedeployer/compose.go b/internal/benchrunner/runners/system/servicedeployer/compose.go deleted file mode 100644 index 0b3dd5e2c2..0000000000 --- a/internal/benchrunner/runners/system/servicedeployer/compose.go +++ /dev/null @@ -1,215 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/elastic/elastic-package/internal/builder" - "github.com/elastic/elastic-package/internal/compose" - "github.com/elastic/elastic-package/internal/docker" - "github.com/elastic/elastic-package/internal/files" - "github.com/elastic/elastic-package/internal/logger" - "github.com/elastic/elastic-package/internal/stack" -) - -// DockerComposeServiceDeployer knows how to deploy a service defined via -// a Docker Compose file. -type DockerComposeServiceDeployer struct { - ymlPaths []string -} - -type dockerComposeDeployedService struct { - ctxt ServiceContext - - ymlPaths []string - project string -} - -// NewDockerComposeServiceDeployer returns a new instance of a DockerComposeServiceDeployer. -func NewDockerComposeServiceDeployer(ymlPaths []string) (*DockerComposeServiceDeployer, error) { - return &DockerComposeServiceDeployer{ - ymlPaths: ymlPaths, - }, nil -} - -// SetUp sets up the service and returns any relevant information. -func (d *DockerComposeServiceDeployer) SetUp(inCtxt ServiceContext) (DeployedService, error) { - logger.Debug("setting up service using Docker Compose service deployer") - service := dockerComposeDeployedService{ - ymlPaths: d.ymlPaths, - project: "elastic-package-service", - } - outCtxt := inCtxt - - p, err := compose.NewProject(service.project, service.ymlPaths...) - if err != nil { - return nil, fmt.Errorf("could not create Docker Compose project for service: %w", err) - } - - // Verify the Elastic stack network - err = stack.EnsureStackNetworkUp() - if err != nil { - return nil, fmt.Errorf("elastic stack network is not ready: %w", err) - } - - // Clean service logs - err = files.RemoveContent(outCtxt.Logs.Folder.Local) - if err != nil { - return nil, fmt.Errorf("removing service logs failed: %w", err) - } - - serviceName := inCtxt.Name - opts := compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, outCtxt.Logs.Folder.Local)}, - ExtraArgs: []string{"--build", "-d"}, - } - err = p.Up(opts) - if err != nil { - return nil, fmt.Errorf("could not boot up service using Docker Compose: %w", err) - } - err = p.WaitForHealthy(opts) - if err != nil { - processServiceContainerLogs(p, compose.CommandOptions{ - Env: opts.Env, - }, outCtxt.Name) - return nil, fmt.Errorf("service is unhealthy: %w", err) - } - - // Build service container name - outCtxt.Hostname = p.ContainerName(serviceName) - - // Connect service network with stack network (for the purpose of metrics collection) - err = docker.ConnectToNetwork(p.ContainerName(serviceName), stack.Network()) - if err != nil { - return nil, fmt.Errorf("can't attach service container to the stack network: %w", err) - } - - logger.Debugf("adding service container %s internal ports to context", p.ContainerName(serviceName)) - serviceComposeConfig, err := p.Config(compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, outCtxt.Logs.Folder.Local)}, - }) - if err != nil { - return nil, fmt.Errorf("could not get Docker Compose configuration for service: %w", err) - } - - s := serviceComposeConfig.Services[serviceName] - outCtxt.Ports = make([]int, len(s.Ports)) - for idx, port := range s.Ports { - outCtxt.Ports[idx] = port.InternalPort - } - - // Shortcut to first port for convenience - if len(outCtxt.Ports) > 0 { - outCtxt.Port = outCtxt.Ports[0] - } - - outCtxt.Agent.Host.NamePrefix = "docker-fleet-agent" - service.ctxt = outCtxt - return &service, nil -} - -// Signal sends a signal to the service. -func (s *dockerComposeDeployedService) Signal(signal string) error { - p, err := compose.NewProject(s.project, s.ymlPaths...) - if err != nil { - return fmt.Errorf("could not create Docker Compose project for service: %w", err) - } - - opts := compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, s.ctxt.Logs.Folder.Local)}, - ExtraArgs: []string{"-s", signal}, - } - if s.ctxt.Name != "" { - opts.Services = append(opts.Services, s.ctxt.Name) - } - - if err := p.Kill(opts); err != nil { - return fmt.Errorf("could not send %q signal: %w", signal, err) - } - return nil -} - -// TearDown tears down the service. -func (s *dockerComposeDeployedService) TearDown() error { - logger.Debugf("tearing down service using Docker Compose runner") - defer func() { - err := files.RemoveContent(s.ctxt.Logs.Folder.Local) - if err != nil { - logger.Errorf("could not remove the service logs (path: %s)", s.ctxt.Logs.Folder.Local) - } - }() - - p, err := compose.NewProject(s.project, s.ymlPaths...) - if err != nil { - return fmt.Errorf("could not create Docker Compose project for service: %w", err) - } - - opts := compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, s.ctxt.Logs.Folder.Local)}, - } - processServiceContainerLogs(p, opts, s.ctxt.Name) - - if err := p.Down(compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, s.ctxt.Logs.Folder.Local)}, - ExtraArgs: []string{"--volumes"}, // Remove associated volumes. - }); err != nil { - return fmt.Errorf("could not shut down service using Docker Compose: %w", err) - } - return nil -} - -// Context returns the current context for the service. -func (s *dockerComposeDeployedService) Context() ServiceContext { - return s.ctxt -} - -// SetContext sets the current context for the service. -func (s *dockerComposeDeployedService) SetContext(ctxt ServiceContext) error { - s.ctxt = ctxt - return nil -} - -func processServiceContainerLogs(p *compose.Project, opts compose.CommandOptions, serviceName string) { - content, err := p.Logs(opts) - if err != nil { - logger.Errorf("can't export service logs: %v", err) - return - } - - if len(content) == 0 { - logger.Info("service container hasn't written anything logs.") - return - } - - err = writeServiceContainerLogs(serviceName, content) - if err != nil { - logger.Errorf("can't write service container logs: %v", err) - } -} - -func writeServiceContainerLogs(serviceName string, content []byte) error { - buildDir, err := builder.BuildDirectory() - if err != nil { - return fmt.Errorf("locating build directory failed: %w", err) - } - - containerLogsDir := filepath.Join(buildDir, "container-logs") - err = os.MkdirAll(containerLogsDir, 0755) - if err != nil { - return fmt.Errorf("can't create directory for service container logs (path: %s): %w", containerLogsDir, err) - } - - containerLogsFilepath := filepath.Join(containerLogsDir, fmt.Sprintf("%s-%d.log", serviceName, time.Now().UnixNano())) - logger.Infof("Write container logs to file: %s", containerLogsFilepath) - err = os.WriteFile(containerLogsFilepath, content, 0644) - if err != nil { - return fmt.Errorf("can't write container logs to file (path: %s): %w", containerLogsFilepath, err) - } - return nil -} diff --git a/internal/benchrunner/runners/system/servicedeployer/context.go b/internal/benchrunner/runners/system/servicedeployer/context.go deleted file mode 100644 index d9c0aabf5c..0000000000 --- a/internal/benchrunner/runners/system/servicedeployer/context.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -const ( - LocalCACertEnv = "LOCAL_CA_CERT" - ServiceLogsDirEnv = "SERVICE_LOGS_DIR" - BenchRunIDEnv = "BENCH_RUN_ID" -) - -// ServiceContext encapsulates context that is both available to a ServiceDeployer and -// populated by a DeployedService. The fields in ServiceContext may be used in handlebars -// templates in system benchmark configuration files, for example: {{ Hostname }}. -type ServiceContext struct { - // Name is the name of the service. - Name string - - // Hostname is the host name of the service, as addressable from - // the Agent container. - Hostname string - - // Ports is a list of ports that the service listens on, as addressable - // from the Agent container. - Ports []int - - // Port points to the first port in the list of ports. It's provided as - // a convenient shortcut as most services tend to listen on a single port. - Port int - - // Logs contains folder paths for log files produced by the service. - Logs struct { - Folder struct { - // Local contains the folder path where log files produced by - // the service are stored on the local filesystem, i.e. where - // elastic-package is running. - Local string - - // Agent contains the folder path where log files produced by - // the service are stored on the Agent container's filesystem. - Agent string - } - } - - // Bench related properties. - Bench struct { - // RunID identifies the current benchmark run. - RunID string - } - - // Agent related properties. - Agent struct { - // Host describes the machine which is running the agent. - Host struct { - // Name prefix for the host's name - NamePrefix string - } - } - - // CustomProperties store additional data used to boot up the service, e.g. AWS credentials. - CustomProperties map[string]interface{} -} - -// Aliases method returned aliases to properties of the service context. -func (sc *ServiceContext) Aliases() map[string]interface{} { - m := map[string]interface{}{ - ServiceLogsDirEnv: func() interface{} { - return sc.Logs.Folder.Agent - }, - BenchRunIDEnv: func() interface{} { - return sc.Bench.RunID - }, - } - - for k, v := range sc.CustomProperties { - var that = v - m[k] = func() interface{} { // wrap as function - return that - } - } - return m -} diff --git a/internal/benchrunner/runners/system/servicedeployer/factory.go b/internal/benchrunner/runners/system/servicedeployer/factory.go deleted file mode 100644 index b7b7145d8e..0000000000 --- a/internal/benchrunner/runners/system/servicedeployer/factory.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -import ( - "errors" - "fmt" - "os" - "path/filepath" -) - -const devDeployDir = "_dev/benchmark/system/deploy" - -// FactoryOptions defines options used to create an instance of a service deployer. -type FactoryOptions struct { - RootPath string -} - -// Factory chooses the appropriate service runner for the given data stream, depending -// on service configuration files defined in the package or data stream. -func Factory(options FactoryOptions) (ServiceDeployer, error) { - devDeployPath, err := FindDevDeployPath(options) - if err != nil { - return nil, fmt.Errorf("can't find \"%s\" directory: %w", devDeployDir, err) - } - - serviceDeployerName, err := findServiceDeployer(devDeployPath) - if err != nil { - return nil, fmt.Errorf("can't find any valid service deployer: %w", err) - } - - serviceDeployerPath := filepath.Join(devDeployPath, serviceDeployerName) - - switch serviceDeployerName { - case "docker": - dockerComposeYMLPath := filepath.Join(serviceDeployerPath, "docker-compose.yml") - if _, err := os.Stat(dockerComposeYMLPath); err == nil { - return NewDockerComposeServiceDeployer([]string{dockerComposeYMLPath}) - } - } - return nil, fmt.Errorf("unsupported service deployer (name: %s)", serviceDeployerName) -} - -// FindDevDeployPath function returns a path reference to the "_dev/deploy" directory. -func FindDevDeployPath(options FactoryOptions) (string, error) { - path := filepath.Join(options.RootPath, devDeployDir) - if _, err := os.Stat(path); err == nil { - return path, nil - } else if !errors.Is(err, os.ErrNotExist) { - return "", fmt.Errorf("stat failed for path (path: %s): %w", path, err) - } - return "", fmt.Errorf("\"%s\" directory doesn't exist", devDeployDir) -} - -func findServiceDeployer(devDeployPath string) (string, error) { - fis, err := os.ReadDir(devDeployPath) - if err != nil { - return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployDir, err) - } - - var folders []os.DirEntry - for _, fi := range fis { - if fi.IsDir() { - folders = append(folders, fi) - } - } - - if len(folders) != 1 { - return "", fmt.Errorf("expected to find only one service deployer in \"%s\"", devDeployPath) - } - return folders[0].Name(), nil -} diff --git a/internal/service/boot.go b/internal/service/boot.go index cd8549d677..bca21d69d1 100644 --- a/internal/service/boot.go +++ b/internal/service/boot.go @@ -13,8 +13,8 @@ import ( "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/configuration/locations" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/testrunner/runners/system" - "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" ) // Options define the details of the service which should be booted up. diff --git a/internal/testrunner/runners/system/servicedeployer/_static/Dockerfile.terraform_deployer b/internal/servicedeployer/_static/Dockerfile.terraform_deployer similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/Dockerfile.terraform_deployer rename to internal/servicedeployer/_static/Dockerfile.terraform_deployer diff --git a/internal/testrunner/runners/system/servicedeployer/_static/docker-custom-agent-base.yml b/internal/servicedeployer/_static/docker-custom-agent-base.yml similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/docker-custom-agent-base.yml rename to internal/servicedeployer/_static/docker-custom-agent-base.yml diff --git a/internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer.yml b/internal/servicedeployer/_static/terraform_deployer.yml similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer.yml rename to internal/servicedeployer/_static/terraform_deployer.yml diff --git a/internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer_run.sh b/internal/servicedeployer/_static/terraform_deployer_run.sh similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer_run.sh rename to internal/servicedeployer/_static/terraform_deployer_run.sh diff --git a/internal/testrunner/runners/system/servicedeployer/compose.go b/internal/servicedeployer/compose.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/compose.go rename to internal/servicedeployer/compose.go diff --git a/internal/testrunner/runners/system/servicedeployer/context.go b/internal/servicedeployer/context.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/context.go rename to internal/servicedeployer/context.go diff --git a/internal/testrunner/runners/system/servicedeployer/custom_agent.go b/internal/servicedeployer/custom_agent.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/custom_agent.go rename to internal/servicedeployer/custom_agent.go diff --git a/internal/benchrunner/runners/system/servicedeployer/deployed_service.go b/internal/servicedeployer/deployed_service.go similarity index 100% rename from internal/benchrunner/runners/system/servicedeployer/deployed_service.go rename to internal/servicedeployer/deployed_service.go diff --git a/internal/testrunner/runners/system/servicedeployer/elastic-agent-managed.yaml.tmpl b/internal/servicedeployer/elastic-agent-managed.yaml.tmpl similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/elastic-agent-managed.yaml.tmpl rename to internal/servicedeployer/elastic-agent-managed.yaml.tmpl diff --git a/internal/testrunner/runners/system/servicedeployer/factory.go b/internal/servicedeployer/factory.go similarity index 84% rename from internal/testrunner/runners/system/servicedeployer/factory.go rename to internal/servicedeployer/factory.go index 27f8f5e113..495fb3cf7f 100644 --- a/internal/testrunner/runners/system/servicedeployer/factory.go +++ b/internal/servicedeployer/factory.go @@ -11,12 +11,17 @@ import ( "path/filepath" ) -const devDeployDir = "_dev/deploy" +const ( + TypeTest = "test" + TypeBench = "bench" +) // FactoryOptions defines options used to create an instance of a service deployer. type FactoryOptions struct { PackageRootPath string DataStreamRootPath string + DevDeployDir string + Type string Variant string } @@ -26,7 +31,7 @@ type FactoryOptions struct { func Factory(options FactoryOptions) (ServiceDeployer, error) { devDeployPath, err := FindDevDeployPath(options) if err != nil { - return nil, fmt.Errorf("can't find \"%s\" directory: %w", devDeployDir, err) + return nil, fmt.Errorf("can't find \"%s\" directory: %w", options.DevDeployDir, err) } serviceDeployerName, err := findServiceDeployer(devDeployPath) @@ -51,6 +56,9 @@ func Factory(options FactoryOptions) (ServiceDeployer, error) { return NewDockerComposeServiceDeployer([]string{dockerComposeYMLPath}, sv) } case "agent": + if options.Type != TypeTest { + return nil, fmt.Errorf("agent deployer is not supported for type %s", options.Type) + } customAgentCfgYMLPath := filepath.Join(serviceDeployerPath, "custom-agent.yml") if _, err := os.Stat(customAgentCfgYMLPath); err != nil { return nil, fmt.Errorf("can't find expected file custom-agent.yml: %w", err) @@ -67,28 +75,27 @@ func Factory(options FactoryOptions) (ServiceDeployer, error) { // FindDevDeployPath function returns a path reference to the "_dev/deploy" directory. func FindDevDeployPath(options FactoryOptions) (string, error) { - dataStreamDevDeployPath := filepath.Join(options.DataStreamRootPath, devDeployDir) - _, err := os.Stat(dataStreamDevDeployPath) - if err == nil { + dataStreamDevDeployPath := filepath.Join(options.DataStreamRootPath, options.DevDeployDir) + if _, err := os.Stat(dataStreamDevDeployPath); err == nil { return dataStreamDevDeployPath, nil } else if !errors.Is(err, os.ErrNotExist) { return "", fmt.Errorf("stat failed for data stream (path: %s): %w", dataStreamDevDeployPath, err) } - packageDevDeployPath := filepath.Join(options.PackageRootPath, devDeployDir) - _, err = os.Stat(packageDevDeployPath) - if err == nil { + packageDevDeployPath := filepath.Join(options.PackageRootPath, options.DevDeployDir) + if _, err := os.Stat(packageDevDeployPath); err == nil { return packageDevDeployPath, nil } else if !errors.Is(err, os.ErrNotExist) { return "", fmt.Errorf("stat failed for package (path: %s): %w", packageDevDeployPath, err) } - return "", fmt.Errorf("\"%s\" directory doesn't exist", devDeployDir) + + return "", fmt.Errorf("%q directory doesn't exist", options.DevDeployDir) } func findServiceDeployer(devDeployPath string) (string, error) { fis, err := os.ReadDir(devDeployPath) if err != nil { - return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployDir, err) + return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployPath, err) } var folders []os.DirEntry diff --git a/internal/testrunner/runners/system/servicedeployer/kubernetes.go b/internal/servicedeployer/kubernetes.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/kubernetes.go rename to internal/servicedeployer/kubernetes.go diff --git a/internal/benchrunner/runners/system/servicedeployer/service_deployer.go b/internal/servicedeployer/service_deployer.go similarity index 100% rename from internal/benchrunner/runners/system/servicedeployer/service_deployer.go rename to internal/servicedeployer/service_deployer.go diff --git a/internal/testrunner/runners/system/servicedeployer/terraform.go b/internal/servicedeployer/terraform.go similarity index 95% rename from internal/testrunner/runners/system/servicedeployer/terraform.go rename to internal/servicedeployer/terraform.go index 1a61125224..8bbb0c65d4 100644 --- a/internal/testrunner/runners/system/servicedeployer/terraform.go +++ b/internal/servicedeployer/terraform.go @@ -210,4 +210,12 @@ func (tsd TerraformServiceDeployer) installDockerfile() (string, error) { return tfDir, nil } +func CreateOutputDir(locationManager *locations.LocationManager, runId string) (string, error) { + outputDir := filepath.Join(locationManager.ServiceOutputDir(), runId) + if err := os.MkdirAll(outputDir, 0755); err != nil { + return "", fmt.Errorf("failed to create output directory: %w", err) + } + return outputDir, nil +} + var _ ServiceDeployer = new(TerraformServiceDeployer) diff --git a/internal/testrunner/runners/system/servicedeployer/terraform_env.go b/internal/servicedeployer/terraform_env.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/terraform_env.go rename to internal/servicedeployer/terraform_env.go diff --git a/internal/testrunner/runners/system/servicedeployer/terraform_test.go b/internal/servicedeployer/terraform_test.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/terraform_test.go rename to internal/servicedeployer/terraform_test.go diff --git a/internal/testrunner/runners/system/servicedeployer/variants.go b/internal/servicedeployer/variants.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/variants.go rename to internal/servicedeployer/variants.go diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index f1af3f0277..761b064f0e 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -24,10 +24,10 @@ import ( "github.com/elastic/elastic-package/internal/multierror" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/packages/installer" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/signal" "github.com/elastic/elastic-package/internal/stack" "github.com/elastic/elastic-package/internal/testrunner" - "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" ) const ( @@ -35,6 +35,7 @@ const ( testRunMinID = 10000 allFieldsBody = `{"fields": ["*"]}` + devDeployDir = "_dev/deploy" ) func init() { @@ -200,6 +201,7 @@ func (r *runner) run() (results []testrunner.TestResult, err error) { devDeployPath, err := servicedeployer.FindDevDeployPath(servicedeployer.FactoryOptions{ PackageRootPath: r.options.PackageRootPath, DataStreamRootPath: dataStreamPath, + DevDeployDir: devDeployDir, }) if err != nil { return result.WithError(fmt.Errorf("_dev/deploy directory not found: %w", err)) @@ -252,6 +254,7 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa PackageRootPath: r.options.PackageRootPath, DataStreamRootPath: dataStreamPath, Variant: variantName, + Type: servicedeployer.TypeTest, } var ctxt servicedeployer.ServiceContext @@ -260,7 +263,7 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa ctxt.Logs.Folder.Agent = ServiceLogsAgentDir ctxt.Test.RunID = createTestRunID() - outputDir, err := createOutputDir(locationManager, ctxt.Test.RunID) + outputDir, err := servicedeployer.CreateOutputDir(locationManager, ctxt.Test.RunID) if err != nil { return nil, fmt.Errorf("could not create output dir for terraform deployer %w", err) } @@ -293,14 +296,6 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa return partial, nil } -func createOutputDir(locationManager *locations.LocationManager, runId string) (string, error) { - outputDir := filepath.Join(locationManager.ServiceOutputDir(), runId) - if err := os.MkdirAll(outputDir, 0755); err != nil { - return "", fmt.Errorf("failed to create output directory: %w", err) - } - return outputDir, nil -} - func createTestRunID() string { return fmt.Sprintf("%d", rand.Intn(testRunMaxID-testRunMinID)+testRunMinID) } diff --git a/internal/testrunner/runners/system/servicedeployer/deployed_service.go b/internal/testrunner/runners/system/servicedeployer/deployed_service.go deleted file mode 100644 index ebd1a87f9e..0000000000 --- a/internal/testrunner/runners/system/servicedeployer/deployed_service.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -// DeployedService defines the interface for interacting with a service that has been deployed. -type DeployedService interface { - // TearDown implements the logic for tearing down a service. - TearDown() error - - // Signal sends a signal to the service. - Signal(signal string) error - - // Context returns the current context from the service. - Context() ServiceContext - - // SetContext sets the current context for the service. - SetContext(str ServiceContext) error -} diff --git a/internal/testrunner/runners/system/servicedeployer/service_deployer.go b/internal/testrunner/runners/system/servicedeployer/service_deployer.go deleted file mode 100644 index 5e1cb93afe..0000000000 --- a/internal/testrunner/runners/system/servicedeployer/service_deployer.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -// ServiceDeployer defines the interface for deploying a service. It defines methods for -// controlling the lifecycle of a service. -type ServiceDeployer interface { - // SetUp implements the logic for setting up a service. It takes a context and returns a - // ServiceHandler. - SetUp(ctxt ServiceContext) (DeployedService, error) -} diff --git a/internal/testrunner/runners/system/test_config.go b/internal/testrunner/runners/system/test_config.go index 63c374ba3a..6a55f9d27d 100644 --- a/internal/testrunner/runners/system/test_config.go +++ b/internal/testrunner/runners/system/test_config.go @@ -19,8 +19,8 @@ import ( "github.com/elastic/go-ucfg/yaml" "github.com/elastic/elastic-package/internal/common" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/testrunner" - "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" ) var systemTestConfigFilePattern = regexp.MustCompile(`^test-([a-z0-9_.-]+)-config.yml$`) From a72a1ed20f72fb0a4acc79b2052077be8425bf3a Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 3 Jul 2023 12:25:41 +0200 Subject: [PATCH 09/23] Gracefully handle errors --- cmd/benchmark.go | 9 +++++++-- internal/benchrunner/runner.go | 4 ++-- scripts/test-check-packages.sh | 7 +++---- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cmd/benchmark.go b/cmd/benchmark.go index de87d080eb..4769f5085f 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -304,14 +304,19 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("system benchmark is expected to return multiple reports") } + reports := multiReport.Split() + if len(reports) != 2 { + return fmt.Errorf("system benchmark is expected to return a human an a file report") + } + // human report will always be the first - human := multiReport.Split()[0] + human := reports[0] if err := reporters.WriteReportable(reporters.Output(outputs.ReportOutputSTDOUT), human); err != nil { return fmt.Errorf("error writing benchmark report: %w", err) } // file report will always be the second - file := multiReport.Split()[1] + file := reports[1] if err := reporters.WriteReportable(reporters.Output(outputs.ReportOutputFile), file); err != nil { return fmt.Errorf("error writing benchmark report: %w", err) } diff --git a/internal/benchrunner/runner.go b/internal/benchrunner/runner.go index 30058cd009..c356bb48b5 100644 --- a/internal/benchrunner/runner.go +++ b/internal/benchrunner/runner.go @@ -30,12 +30,12 @@ func Run(runner Runner) (reporters.Reportable, error) { defer func() { // we want to ensure correct tear down of the benchmark in any situation if rerr := recover(); rerr != nil { - logger.Errorf("panic occurred: %w", rerr) + logger.Errorf("panic occurred: %v", rerr) } tdErr := runner.TearDown() if tdErr != nil { - logger.Errorf("could not teardown benchmark runner: %w", tdErr) + logger.Errorf("could not teardown benchmark runner: %v", tdErr) } }() diff --git a/scripts/test-check-packages.sh b/scripts/test-check-packages.sh index 3596656b27..13e1b3c1bc 100755 --- a/scripts/test-check-packages.sh +++ b/scripts/test-check-packages.sh @@ -85,10 +85,9 @@ for d in test/packages/${PACKAGE_TEST_TYPE:-other}/${PACKAGE_UNDER_TEST:-*}/; do --old ${OLDPWD}/build/benchmark-results-old \ --threshold 1 --report-output-path="${OLDPWD}/build/benchreport" fi - # FIXME: running system benchmark in package "system_benchmark" fails with panic - # if [ "${package_to_test}" == "system_benchmark" ]; then - # elastic-package benchmark system --benchmark logs-benchmark -v --defer-cleanup 1s - # fi + if [ "${package_to_test}" == "system_benchmark" ]; then + elastic-package benchmark system --benchmark logs-benchmark -v --defer-cleanup 1s + fi else # defer-cleanup is set to a short period to verify that the option is available elastic-package test -v --report-format xUnit --report-output file --defer-cleanup 1s --test-coverage From b002a0d70def22d2c6fe36a0939d1e68533803d0 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 3 Jul 2023 12:26:04 +0200 Subject: [PATCH 10/23] Allow setting custom policy_template value --- internal/benchrunner/runners/system/runner.go | 7 ++++++- internal/benchrunner/runners/system/scenario.go | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index f5984a5bfd..c096290623 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -262,6 +262,7 @@ func (r *runner) run() (report reporters.Reportable, err error) { } r.startMetricsColletion() + defer r.mcollector.stop() // if there is a generator config, generate the data if r.generator != nil { @@ -384,13 +385,17 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k r.scenario.Package = pkgManifest.Name } + if r.scenario.PolicyTemplate == "" { + r.scenario.PolicyTemplate = pkgManifest.PolicyTemplates[0].Name + } + // TODO: add ability to define which policy template to use pp := kibana.PackagePolicy{ Namespace: "ep", PolicyID: p.ID, Force: true, Inputs: map[string]kibana.PackagePolicyInput{ - fmt.Sprintf("%s-%s", pkgManifest.PolicyTemplates[0].Name, r.scenario.Input): { + fmt.Sprintf("%s-%s", r.scenario.PolicyTemplate, r.scenario.Input): { Enabled: true, Vars: r.scenario.Vars, Streams: map[string]kibana.PackagePolicyStream{ diff --git a/internal/benchrunner/runners/system/scenario.go b/internal/benchrunner/runners/system/scenario.go index af02ad4405..f0837d3e01 100644 --- a/internal/benchrunner/runners/system/scenario.go +++ b/internal/benchrunner/runners/system/scenario.go @@ -24,6 +24,7 @@ type scenario struct { Package string `config:"package" json:"package"` Description string `config:"description" json:"description"` Version string `config:"version" json:"version"` + PolicyTemplate string `config:"policy_template" json:"policy_template"` Input string `config:"input" json:"input"` Vars map[string]interface{} `config:"vars" json:"vars"` DataStream dataStream `config:"data_stream" json:"data_stream"` From d7a0a75bf4dad658d82df8c5ef5697a4fc628955 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 3 Jul 2023 12:45:38 +0200 Subject: [PATCH 11/23] Improve metrics collection lifecycle --- .../benchrunner/runners/system/metrics.go | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 0cfe7e80ba..399c2444ac 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -8,7 +8,9 @@ import ( "bytes" "encoding/json" "fmt" + "io" "sync" + "sync/atomic" "time" "github.com/elastic/elastic-package/internal/elasticsearch" @@ -16,7 +18,6 @@ import ( "github.com/elastic/elastic-package/internal/environment" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/servicedeployer" - "github.com/elastic/elastic-package/internal/signal" ) var ( @@ -37,8 +38,10 @@ type collector struct { datastream string pipelinePrefix string - stopC chan struct{} - tick *time.Ticker + wg sync.WaitGroup + stopped atomic.Bool + stopC chan struct{} + tick *time.Ticker startIngestMetrics map[string]ingest.PipelineStatsMap endIngestMetrics map[string]ingest.PipelineStatsMap @@ -97,32 +100,25 @@ func (c *collector) start() { c.createMetricsIndex() var once sync.Once + c.wg.Add(1) go func() { - once.Do(c.waitUntilReady) - defer c.tick.Stop() - - c.startIngestMetrics = c.collectIngestMetrics() - c.startTotalHits = c.collectTotalHits() - c.startMetrics = c.collect() - c.publish(c.createEventsFromMetrics(c.startMetrics)) - + defer c.wg.Done() for { - if signal.SIGINT() { - logger.Debug("SIGINT: cancel metrics collection") - c.collectMetricsPreviousToStop() - c.publish(c.createEventsFromMetrics(c.endMetrics)) - return - } - select { case <-c.stopC: // last collect before stopping c.collectMetricsPreviousToStop() c.publish(c.createEventsFromMetrics(c.endMetrics)) - c.stopC <- struct{}{} return case <-c.tick.C: + once.Do(func() { + c.waitUntilReady() + c.startIngestMetrics = c.collectIngestMetrics() + c.startTotalHits = c.collectTotalHits() + c.startMetrics = c.collect() + c.publish(c.createEventsFromMetrics(c.startMetrics)) + }) m := c.collect() c.publish(c.createEventsFromMetrics(m)) } @@ -131,9 +127,11 @@ func (c *collector) start() { } func (c *collector) stop() { - c.stopC <- struct{}{} - <-c.stopC + if !c.stopped.CompareAndSwap(false, true) { + return + } close(c.stopC) + c.wg.Wait() } func (c *collector) collect() metrics { @@ -166,18 +164,17 @@ func (c *collector) publish(events [][]byte) { reqBody := bytes.NewReader(e) resp, err := c.msapi.Index(c.indexName(), reqBody) if err != nil { - logger.Debugf("error indexing metrics: %e", err) + logger.Debugf("error indexing event: %v", err) continue } - var sr map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&sr); err != nil { - logger.Debugf("error decoding search response: %e", err) + + body, err := io.ReadAll(resp.Body) + if err != nil { + logger.Errorf("failed to read index response body: %v", err) } - resErr, found := sr["error"] - if found { - errStr := resErr.(map[string]interface{})["reason"].(string) - logger.Debugf("error searching for documents: %s", errStr) + if resp.StatusCode != 201 { + logger.Errorf("error indexing event (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body)) } } } @@ -298,12 +295,11 @@ func (c *collector) waitUntilReady() { readyLoop: for { - if signal.SIGINT() { - logger.Debug("SIGINT: cancel metrics collection") + select { + case <-c.stopC: return + case <-waitTick.C: } - - <-waitTick.C dsstats, err := ingest.GetDataStreamStats(c.esapi, c.datastream) if err != nil { logger.Debug(err) @@ -315,7 +311,11 @@ readyLoop: if c.scenario.WarmupTimePeriod > 0 { logger.Debugf("waiting %s for warmup period", c.scenario.WarmupTimePeriod) - <-time.After(c.scenario.WarmupTimePeriod) + select { + case <-c.stopC: + return + case <-time.After(c.scenario.WarmupTimePeriod): + } } logger.Debug("metric collection starting...") } From 70d5931496124c19b029031d5a42f81d447a46c5 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 4 Jul 2023 11:28:35 +0200 Subject: [PATCH 12/23] Add docs --- docs/howto/system_benchmarking.md | 162 +++++++++++++++++++++++++++++- 1 file changed, 160 insertions(+), 2 deletions(-) diff --git a/docs/howto/system_benchmarking.md b/docs/howto/system_benchmarking.md index df6bbe5d89..6d2655708a 100644 --- a/docs/howto/system_benchmarking.md +++ b/docs/howto/system_benchmarking.md @@ -42,8 +42,8 @@ Optionally system benchmarks can define a configuration for deploying a package' `` - a name of the supported service deployer: * `docker` - Docker Compose - -**TODO**: support other service deployers +* `k8s` - Kubernetes +* `tf` - Terraform ### Docker Compose service deployer @@ -54,6 +54,163 @@ The `docker-compose.yml` file defines the integration service(s) for the package when they are stopped. Docker compose may not be able to find volumes defined in the Dockerfile for this cleanup. In these cases, override the volume definition. +### Terraform service deployer + +When using the Terraform service deployer, the `` must include at least one `*.tf` file. +The `*.tf` files define the infrastructure using the Terraform syntax. The terraform based service can be handy to boot up +resources using selected cloud provider and use them for testing (e.g. observe and collect metrics). + +Sample `main.tf` definition: + +``` +variable "TEST_RUN_ID" { + default = "detached" +} + +provider "aws" {} + +resource "aws_instance" "i" { + ami = data.aws_ami.latest-amzn.id + monitoring = true + instance_type = "t1.micro" + tags = { + Name = "elastic-package-test-${var.TEST_RUN_ID}" + } +} + +data "aws_ami" "latest-amzn" { + most_recent = true + owners = [ "amazon" ] # AWS + filter { + name = "name" + values = ["amzn2-ami-hvm-*"] + } +} +``` + +Notice the use of the `TEST_RUN_ID` variable. It contains a unique ID, which can help differentiate resources created in potential concurrent test runs. + +#### Terraform Outputs + +The outputs generated by the terraform service deployer can be accessed in the system test config using handlebars template. +For example, if a `SQS queue` is configured in terraform and if the `queue_url` is configured as output , it can be used in the test config as a handlebars template `{{TF_OUTPUT_queue_url}}` + +Sample Terraform definition + +``` +resource "aws_sqs_queue" "test" { + +} + +output "queue_url"{ + value = aws_sqs_queue.test.url +} +``` + +Sample system test config + +``` yaml +data_stream: + vars: + period: 5m + latency: 10m + queue_url: '{{TF_OUTPUT_queue_url}}' + tags_filter: |- + - key: Name + value: "elastic-package-test-{{TEST_RUN_ID}}" +``` + +For complex outputs from terraform you can use `{{TF_OUTPUT_root_key.nested_key}}` + +``` +output "root_key"{ + value = someoutput.nested_key_value +} +``` +``` json +{ + "root_key": { + "sensitive": false, + "type": [ + "object", + { + "nested_key": "string" + } + ], + "value": { + "nested_key": "this is a nested key" + } + } +} +``` +``` yaml +data_stream: + vars: + queue_url: '{{TF_OUTPUT_root_key.nested_key}}' +``` + +#### Environment variables + +To use environment variables within the Terraform service deployer a `env.yml` file is required. + +The file should be structured like this: + +```yaml +version: '2.3' +services: + terraform: + environment: + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} +``` + +It's purpose is to inject environment variables in the Terraform service deployer environment. + +To specify a default use this syntax: `AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-default}`, replacing `default` with the desired default value. + +**NOTE**: Terraform requires to prefix variables using the environment variables form with `TF_VAR_`. These variables are not available in test case definitions because they are [not injected](https://github.com/elastic/elastic-package/blob/f5312b6022e3527684e591f99e73992a73baafcf/internal/testrunner/runners/system/servicedeployer/terraform_env.go#L43) in the test environment. + +#### Cloud Provider CI support + +Terraform is often used to interact with Cloud Providers. This require Cloud Provider credentials. + +Injecting credentials can be achieved with functions from the [`apm-pipeline-library`](https://github.com/elastic/apm-pipeline-library/tree/main/vars) Jenkins library. For example look for `withAzureCredentials`, `withAWSEnv` or `withGCPEnv`. + +#### Tagging/labelling created Cloud Provider resources + +Leveraging Terraform to create cloud resources is useful but risks creating leftover resources that are difficult to remove. + +There are some specific environment variables that should be leveraged to overcome this issue; these variables are already injected to be used by Terraform (through `TF_VAR_`): +- `TF_VAR_TEST_RUN_ID`: a unique identifier for the test run, allows to distinguish each run +- `BRANCH_NAME_LOWER_CASE`: the branch name or PR number the CI run is linked to +- `BUILD_ID`: incremental number providing the current CI run number +- `CREATED_DATE`: the creation date in epoch time, milliseconds, when the resource was created +- `ENVIRONMENT`: what environment created the resource (`ci`) +- `REPO`: the GitHub repository name (`elastic-package`) + +### Kubernetes service deployer + +The Kubernetes service deployer requires the `_dev/benchmark/system/deploy/k8s` directory to be present. It can include additional `*.yaml` files to deploy +custom applications in the Kubernetes cluster (e.g. Nginx deployment). It is possible to use a `kustomization.yaml` file. +If no resource definitions (`*.yaml` files ) are needed, +the `_dev/benchmark/system/deploy/k8s` directory must contain an `.empty` file (to preserve the `k8s` directory under version control). + +The Kubernetes service deployer needs [kind](https://kind.sigs.k8s.io/) to be installed and the cluster to be up and running: +```bash +wget -qO- https://raw.githubusercontent.com/elastic/elastic-package/main/scripts/kind-config.yaml | kind create cluster --config - +``` + +Before executing system tests, the service deployer applies once the deployment of the Elastic Agent to the cluster and links +the kind cluster with the Elastic stack network - applications running in the kind cluster can reach Elasticsearch and Kibana instances. +To shorten the total test execution time the Elastic Agent's deployment is not deleted after tests, but it can be reused. + +See how to execute system benchmarks for the Kubernetes integration: + +```bash +elastic-package stack up -d -v # start the Elastic stack +wget -qO- https://raw.githubusercontent.com/elastic/elastic-package/main/scripts/kind-config.yaml | kind create cluster --config - +elastic-package benchmark system --benchmark k8s-benchmark -v +``` + ### Benchmark scenario definition Next, we must define at least one configuration for the package that we @@ -75,6 +232,7 @@ along with values for package and data stream-level variables. These are the ava | package | string | | The name of the package. If omitted will pick the current package, this is to allow for future definition of benchmarks outside of the packages folders. | | description | string | | A description for the scenario. | | version | string | | The version of the package to benchmark. If omitted will pick the current version of the package. | +| policy_template | string | | The policy template to test. If omitted will pick the first one. | | input | string | yes | Input type to test (e.g. logfile, httpjson, etc). Defaults to the input used by the first stream in the data stream manifest. | | vars | dictionary | | Package level variables to set (i.e. declared in `$package_root/manifest.yml`). If not specified the defaults from the manifest are used. | | data_stream.name | string | yes | The data stream to benchmark. | From de517ad556a1573c40e028d6781597fcb7981bec Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 4 Jul 2023 13:41:54 +0200 Subject: [PATCH 13/23] Use DevDeployDir where required --- cmd/service.go | 2 ++ internal/service/boot.go | 2 ++ internal/testrunner/runners/system/runner.go | 5 +++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/service.go b/cmd/service.go index 42f6aa7579..650401ea2e 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/elastic-package/internal/cobraext" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/service" + "github.com/elastic/elastic-package/internal/testrunner/runners/system" ) const serviceLongDescription = `Use this command to boot up the service stack that can be observed with the package. @@ -62,6 +63,7 @@ func upCommandAction(cmd *cobra.Command, args []string) error { err = service.BootUp(service.Options{ ServiceName: serviceName, PackageRootPath: packageRoot, + DevDeployDir: system.DevDeployDir, DataStreamRootPath: dataStreamPath, Variant: variantFlag, }) diff --git a/internal/service/boot.go b/internal/service/boot.go index bca21d69d1..a77d05efbf 100644 --- a/internal/service/boot.go +++ b/internal/service/boot.go @@ -21,6 +21,7 @@ import ( type Options struct { ServiceName string PackageRootPath string + DevDeployDir string DataStreamRootPath string Variant string @@ -32,6 +33,7 @@ func BootUp(options Options) error { serviceDeployer, err := servicedeployer.Factory(servicedeployer.FactoryOptions{ PackageRootPath: options.DataStreamRootPath, DataStreamRootPath: options.DataStreamRootPath, + DevDeployDir: options.DevDeployDir, Variant: options.Variant, }) if err != nil { diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index 761b064f0e..aa17cf3001 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -35,7 +35,7 @@ const ( testRunMinID = 10000 allFieldsBody = `{"fields": ["*"]}` - devDeployDir = "_dev/deploy" + DevDeployDir = "_dev/deploy" ) func init() { @@ -201,7 +201,7 @@ func (r *runner) run() (results []testrunner.TestResult, err error) { devDeployPath, err := servicedeployer.FindDevDeployPath(servicedeployer.FactoryOptions{ PackageRootPath: r.options.PackageRootPath, DataStreamRootPath: dataStreamPath, - DevDeployDir: devDeployDir, + DevDeployDir: DevDeployDir, }) if err != nil { return result.WithError(fmt.Errorf("_dev/deploy directory not found: %w", err)) @@ -253,6 +253,7 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa serviceOptions := servicedeployer.FactoryOptions{ PackageRootPath: r.options.PackageRootPath, DataStreamRootPath: dataStreamPath, + DevDeployDir: DevDeployDir, Variant: variantName, Type: servicedeployer.TypeTest, } From 604e3f7c43695196122ea56349897ec842a68021 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 5 Jul 2023 13:26:22 +0200 Subject: [PATCH 14/23] Exit appropiately when timeout is exceeded --- internal/benchrunner/runner.go | 5 ----- internal/benchrunner/runners/system/runner.go | 12 +++++++----- .../_dev/benchmark/system/logs-benchmark.yml | 1 + 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/internal/benchrunner/runner.go b/internal/benchrunner/runner.go index c356bb48b5..13e939957f 100644 --- a/internal/benchrunner/runner.go +++ b/internal/benchrunner/runner.go @@ -28,11 +28,6 @@ func Run(runner Runner) (reporters.Reportable, error) { } defer func() { - // we want to ensure correct tear down of the benchmark in any situation - if rerr := recover(); rerr != nil { - logger.Errorf("panic occurred: %v", rerr) - } - tdErr := runner.TearDown() if tdErr != nil { logger.Errorf("could not teardown benchmark runner: %v", tdErr) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index c096290623..776dccdca4 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -284,9 +284,13 @@ func (r *runner) run() (report reporters.Reportable, err error) { } } - if err := r.waitUntilBenchmarkFinishes(); err != nil { + finishedOnTime, err := r.waitUntilBenchmarkFinishes() + if err != nil { return nil, err } + if !finishedOnTime { + return nil, errors.New("timeout exceeded") + } msum, err := r.collectAndSummarizeMetrics() if err != nil { @@ -389,7 +393,6 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k r.scenario.PolicyTemplate = pkgManifest.PolicyTemplates[0].Name } - // TODO: add ability to define which policy template to use pp := kibana.PackagePolicy{ Namespace: "ep", PolicyID: p.ID, @@ -615,7 +618,7 @@ func (r *runner) checkEnrolledAgents() ([]kibana.Agent, error) { return agents, nil } -func (r *runner) waitUntilBenchmarkFinishes() error { +func (r *runner) waitUntilBenchmarkFinishes() (bool, error) { logger.Debug("checking for all data in data stream...") var benchTime *time.Timer if r.scenario.BenchmarkTimePeriod > 0 { @@ -623,7 +626,7 @@ func (r *runner) waitUntilBenchmarkFinishes() error { } oldHits := 0 - _, err := waitUntilTrue(func() (bool, error) { + return waitUntilTrue(func() (bool, error) { if signal.SIGINT() { return true, errors.New("SIGINT: cancel waiting for policy assigned") } @@ -650,7 +653,6 @@ func (r *runner) waitUntilBenchmarkFinishes() error { return ret, err }, *r.scenario.WaitForDataTimeout) - return err } func (r *runner) enrollAgents() error { diff --git a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml index 1673942093..d466953809 100644 --- a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml +++ b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml @@ -6,6 +6,7 @@ data_stream.name: testds data_stream.vars.paths: - "{{SERVICE_LOGS_DIR}}/corpus-*" warmup_time_period: 10s +wait_for_data_timeout: 1m corpora.generator.size: 20MiB corpora.generator.template.path: ./logs-benchmark/template.log corpora.generator.config.path: ./logs-benchmark/config.yml From 60bc74fa051d79ed6ab418695a245333017c385d Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 6 Jul 2023 15:21:04 +0200 Subject: [PATCH 15/23] Set corpus file permissions --- internal/benchrunner/runners/system/runner.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 776dccdca4..b38bdacd05 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -556,6 +556,10 @@ func (r *runner) runGenerator(destDir string) error { } defer f.Close() + if err := f.Chmod(os.ModePerm); err != nil { + return err + } + buf := bytes.NewBufferString("") var corpusDocsCount uint64 for { From 5fa9987d83067594722e8f7030aa3f9a3f7d79ac Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Fri, 7 Jul 2023 11:14:23 +0200 Subject: [PATCH 16/23] Add metricstore docs --- docs/howto/system_benchmarking.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/howto/system_benchmarking.md b/docs/howto/system_benchmarking.md index 6d2655708a..e5027a2a06 100644 --- a/docs/howto/system_benchmarking.md +++ b/docs/howto/system_benchmarking.md @@ -401,3 +401,20 @@ Finally, when you are done running the benchmark, bring down the Elastic Stack. elastic-package stack down ``` +## Setting up an external metricstore + +A metricstore can be set up to send metrics collected during the benchmark execution. +In order to initialize it, you need to set up the following environment variables: + +```bash +export ELASTIC_PACKAGE_ESMETRICSTORE_HOST=https://127.0.0.1:9200 +export ELASTIC_PACKAGE_ESMETRICSTORE_USERNAME=elastic +export ELASTIC_PACKAGE_ESMETRICSTORE_PASSWORD=changeme +export ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT="$HOME/.elastic-package/profiles/default/certs/ca-cert.pem" +``` + +The only one that is optional is `ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT`. + +When these are detected, metrics will be automatically collected every second and sent to a new index called `bench-metrics-{dataset}-{testRunID}"`. + +Additionally, if the `reindex-to-metricstore` flag is used, the data generated during the benchmark will be sent to the metricstore into an index called `bench-reindex-{datastream}-{testRunID}` for further analysis. The events will be enriched with metadata related to the benchmark run. \ No newline at end of file From 4e6e9631f0b6fa31f54f7c2780da53ab5f144103 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Aug 2023 12:21:46 +0200 Subject: [PATCH 17/23] Fix error message --- internal/servicedeployer/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/servicedeployer/factory.go b/internal/servicedeployer/factory.go index 062c5a5d90..00021017be 100644 --- a/internal/servicedeployer/factory.go +++ b/internal/servicedeployer/factory.go @@ -93,7 +93,7 @@ func FindDevDeployPath(options FactoryOptions) (string, error) { return "", fmt.Errorf("stat failed for package (path: %s): %w", packageDevDeployPath, err) } - return "", fmt.Errorf("%q directory doesn't exist", options.DevDeployDir) + return "", fmt.Errorf("\"%s\" directory doesn't exist", options.DevDeployDir) } func findServiceDeployer(devDeployPath string) (string, error) { From 4a974ebb469922e5d123b6bd653ebea2036f3fda Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Aug 2023 12:22:15 +0200 Subject: [PATCH 18/23] Improve documentation --- docs/howto/sample_metric.json | 670 ++++++++++++++++++++++++++++++ docs/howto/system_benchmarking.md | 15 +- 2 files changed, 684 insertions(+), 1 deletion(-) create mode 100644 docs/howto/sample_metric.json diff --git a/docs/howto/sample_metric.json b/docs/howto/sample_metric.json new file mode 100644 index 0000000000..bc1bdaf612 --- /dev/null +++ b/docs/howto/sample_metric.json @@ -0,0 +1,670 @@ +{ + "@timestamp": 1692610780000, + "cluster_name": "elasticsearch", + "node_name": "M_lyy0n9TSOjGt3kPhnM8Q", + "Breakers": { + "eql_sequence": { + "limit_size_in_bytes": 536870912, + "estimated_size_in_bytes": 0, + "overhead": 1, + "tripped": 0 + }, + "fielddata": { + "limit_size_in_bytes": 429496729, + "estimated_size_in_bytes": 2456, + "overhead": 1.03, + "tripped": 0 + }, + "inflight_requests": { + "limit_size_in_bytes": 1073741824, + "estimated_size_in_bytes": 0, + "overhead": 2, + "tripped": 0 + }, + "model_inference": { + "limit_size_in_bytes": 536870912, + "estimated_size_in_bytes": 0, + "overhead": 1, + "tripped": 0 + }, + "parent": { + "limit_size_in_bytes": 1020054732, + "estimated_size_in_bytes": 349638992, + "overhead": 1, + "tripped": 0 + }, + "request": { + "limit_size_in_bytes": 644245094, + "estimated_size_in_bytes": 0, + "overhead": 1, + "tripped": 0 + } + }, + "indices": { + "docs": { + "count": 502514, + "deleted": 101 + }, + "shard_stats": { + "total_count": 54 + }, + "store": { + "size_in_bytes": 231344895, + "total_data_set_size_in_bytes": 231344895, + "reserved_in_bytes": 0 + }, + "indexing": { + "index_total": 627678, + "index_time_in_millis": 192856, + "index_current": 0, + "index_failed": 43, + "delete_total": 80, + "delete_time_in_millis": 37, + "delete_current": 0, + "noop_update_total": 0, + "is_throttled": false, + "throttle_time_in_millis": 0, + "write_load": 0.00001370894459913557 + }, + "get": { + "total": 171437, + "time_in_millis": 21970, + "exists_total": 168394, + "exists_time_in_millis": 21727, + "missing_total": 3043, + "missing_time_in_millis": 243, + "current": 0 + }, + "search": { + "open_contexts": 0, + "query_total": 200120, + "query_time_in_millis": 98784, + "query_current": 0, + "fetch_total": 200076, + "fetch_time_in_millis": 8849, + "fetch_current": 0, + "scroll_total": 96697, + "scroll_time_in_millis": 10241728, + "scroll_current": 0, + "suggest_total": 0, + "suggest_time_in_millis": 0, + "suggest_current": 0 + }, + "merges": { + "current": 0, + "current_docs": 0, + "current_size_in_bytes": 0, + "total": 11503, + "total_time_in_millis": 187989, + "total_docs": 6782259, + "total_size_in_bytes": 2574092233, + "total_stopped_time_in_millis": 0, + "total_throttled_time_in_millis": 0, + "total_auto_throttle_in_bytes": 1132462080 + }, + "refresh": { + "total": 122510, + "total_time_in_millis": 905546, + "external_total": 77930, + "external_total_time_in_millis": 970183, + "listeners": 0 + }, + "flush": { + "total": 44585, + "periodic": 44585, + "total_time_in_millis": 2077597 + }, + "warmer": { + "current": 0, + "total": 77718, + "total_time_in_millis": 22729 + }, + "query_cache": { + "memory_size_in_bytes": 0, + "total_count": 0, + "hit_count": 0, + "miss_count": 0, + "cache_size": 0, + "cache_count": 0, + "evictions": 0 + }, + "fielddata": { + "memory_size_in_bytes": 2456, + "evictions": 0 + }, + "completion": { + "size_in_bytes": 0 + }, + "segments": { + "count": 743, + "memory_in_bytes": 0, + "terms_memory_in_bytes": 0, + "stored_fields_memory_in_bytes": 0, + "term_vectors_memory_in_bytes": 0, + "norms_memory_in_bytes": 0, + "points_memory_in_bytes": 0, + "doc_values_memory_in_bytes": 0, + "index_writer_memory_in_bytes": 2325280, + "version_map_memory_in_bytes": 348, + "fixed_bit_set_memory_in_bytes": 672, + "max_unsafe_auto_id_timestamp": 1691678283104, + "file_sizes": {} + }, + "translog": { + "operations": 219, + "size_in_bytes": 363972, + "uncommitted_operations": 219, + "uncommitted_size_in_bytes": 363972, + "earliest_last_modified_age": 694 + }, + "request_cache": { + "memory_size_in_bytes": 51488, + "evictions": 0, + "hit_count": 6830, + "miss_count": 1177 + }, + "recovery": { + "current_as_source": 0, + "current_as_target": 0, + "throttle_time_in_millis": 0 + }, + "bulk": { + "total_operations": 336226, + "total_time_in_millis": 222470, + "total_size_in_bytes": 967279105, + "avg_time_in_millis": 0, + "avg_size_in_bytes": 2840 + }, + "mappings": { + "total_count": 11497, + "total_estimated_overhead_in_bytes": 11772928 + } + }, + "jvm": { + "mem": { + "heap_used_in_bytes": 345444688, + "heap_used_percent": 32, + "heap_committed_in_bytes": 1073741824, + "heap_max_in_bytes": 1073741824, + "non_heap_used_in_bytes": 260239608, + "non_heap_committed_in_bytes": 309788672, + "pools": { + "old": { + "used_in_bytes": 199928320, + "max_in_bytes": 1073741824, + "peak_used_in_bytes": 214294528, + "peak_max_in_bytes": 1073741824 + }, + "survivor": { + "used_in_bytes": 32270160, + "max_in_bytes": 0, + "peak_used_in_bytes": 83886080, + "peak_max_in_bytes": 0 + }, + "young": { + "used_in_bytes": 113246208, + "max_in_bytes": 0, + "peak_used_in_bytes": 629145600, + "peak_max_in_bytes": 0 + } + } + }, + "gc": { + "collectors": { + "G1 Concurrent GC": { + "collection_count": 604, + "collection_time_in_millis": 5896 + }, + "old": { + "collection_count": 0, + "collection_time_in_millis": 0 + }, + "young": { + "collection_count": 1511, + "collection_time_in_millis": 17790 + } + } + }, + "buffer_pools": { + "direct": { + "count": 78, + "used_in_bytes": 10105958, + "total_capacity_in_bytes": 10105956 + }, + "mapped": { + "count": 761, + "used_in_bytes": 229499619, + "total_capacity_in_bytes": 229499619 + }, + "mapped - 'non-volatile memory'": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + } + } + }, + "os": { + "mem": { + "total_in_bytes": 12560445440, + "adjusted_total_in_bytes": 12560445440, + "free_in_bytes": 4413337600, + "used_in_bytes": 8147107840, + "free_percent": 35, + "used_percent": 65 + } + }, + "process": { + "cpu": { + "percent": 0, + "total_in_millis": 9628610 + } + }, + "thread_pool": { + "analyze": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "auto_complete": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "azure_event_loop": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ccr": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "cluster_coordination": { + "threads": 1, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 1, + "completed": 2147 + }, + "fetch_shard_started": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "fetch_shard_store": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "flush": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 44597 + }, + "force_merge": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "generic": { + "threads": 9, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 17, + "completed": 775114 + }, + "get": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "management": { + "threads": 5, + "queue": 0, + "active": 1, + "rejected": 0, + "largest": 5, + "completed": 628761 + }, + "ml_datafeed": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ml_job_comms": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ml_native_inference_comms": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ml_utility": { + "threads": 2, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 2, + "completed": 284235 + }, + "profiling": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "refresh": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 6736640 + }, + "repository_azure": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "rollup_indexing": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "search": { + "threads": 13, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 13, + "completed": 506 + }, + "search_coordination": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "search_throttled": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "searchable_snapshots_cache_fetch_async": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "searchable_snapshots_cache_prewarming": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "security-crypto": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 9 + }, + "security-token-key": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "snapshot": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "snapshot_meta": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "system_critical_read": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 1581 + }, + "system_critical_write": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 180 + }, + "system_read": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 409012 + }, + "system_write": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 104814 + }, + "vector_tile_generation": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "warmer": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 342944 + }, + "watcher": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "write": { + "threads": 8, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 8, + "completed": 868562 + } + }, + "transport": { + "server_open": 0, + "total_outbound_connections": 0, + "rx_count": 0, + "rx_size_in_bytes": 0, + "tx_count": 0, + "tx_size_in_bytes": 0, + "inbound_handling_time_histogram": [ + { + "ge_millis": 0, + "lt_millis": 1, + "count": 273891 + }, + { + "ge_millis": 1, + "lt_millis": 2, + "count": 274801 + }, + { + "ge_millis": 2, + "lt_millis": 4, + "count": 5713 + }, + { + "ge_millis": 4, + "lt_millis": 8, + "count": 834 + }, + { + "ge_millis": 8, + "lt_millis": 16, + "count": 488 + }, + { + "ge_millis": 16, + "lt_millis": 32, + "count": 385 + }, + { + "ge_millis": 32, + "lt_millis": 64, + "count": 161 + }, + { + "ge_millis": 64, + "lt_millis": 128, + "count": 110 + }, + { + "ge_millis": 128, + "lt_millis": 256, + "count": 68 + }, + { + "ge_millis": 256, + "lt_millis": 512, + "count": 30 + }, + { + "ge_millis": 512, + "lt_millis": 1024, + "count": 2 + } + ], + "outbound_handling_time_histogram": [] + }, + "benchmark_metadata": { + "info": { + "benchmark": "logs-benchmark", + "run_id": "a317f7d4-97a2-4ddd-a537-faa6433e62fb" + }, + "parameter": { + "package": "system_benchmarks", + "description": "Benchmark 20MiB of data ingested", + "version": "999.999.999", + "policy_template": "testpo", + "input": "filestream", + "vars": null, + "data_stream": { + "name": "testds", + "vars": { + "paths": [ + "/tmp/service_logs/corpus-*" + ] + } + }, + "warmup_time_period": 10000000000, + "benchmark_time_period": 0, + "wait_for_data_timeout": 60000000000, + "corpora": { + "generator": { + "size": "20MiB", + "template": { + "raw": "", + "path": "./logs-benchmark/template.log", + "type": "" + }, + "config": { + "raw": null, + "path": "./logs-benchmark/config.yml" + }, + "fields": { + "raw": null, + "path": "./logs-benchmark/fields.yml" + } + }, + "input_service": null + } + } + } +} \ No newline at end of file diff --git a/docs/howto/system_benchmarking.md b/docs/howto/system_benchmarking.md index e5027a2a06..77c50da845 100644 --- a/docs/howto/system_benchmarking.md +++ b/docs/howto/system_benchmarking.md @@ -14,7 +14,7 @@ Conceptually, running a system benchmark involves the following steps: 1. Assign the policy to the enrolled Agent(s). 1. Metrics collections from the cluster starts. (**TODO**: record metrics from all Elastic Agents involved using the `system` integration.) 1. Send the collected metrics to the ES Metricstore if set. -1. Generate data if configured (it uses the [corpus-generator-rool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) +1. Generate data if configured (it uses the [corpus-generator-tool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) 1. Wait a reasonable amount of time for the Agent to collect data from the integration service and index it into the correct Elasticsearch data stream. This time can be pre-defined with the `benchmark_time`. In case this setting is not set @@ -404,6 +404,13 @@ elastic-package stack down ## Setting up an external metricstore A metricstore can be set up to send metrics collected during the benchmark execution. + +An external metricstore might be useful for: + +- Store monitoring data of the benchmark scenario for all its execution time. +- Analyse the data generated during a benchmark. This is possible when using the `reindex-to-metricstore` flag. +- **TODO**: Store benchmark results for various benchmark runs permanently for later comparison. + In order to initialize it, you need to set up the following environment variables: ```bash @@ -417,4 +424,10 @@ The only one that is optional is `ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT`. When these are detected, metrics will be automatically collected every second and sent to a new index called `bench-metrics-{dataset}-{testRunID}"`. +The collected metrics include the following node stats: `nodes.*.breakers`, `nodes.*.indices`, `nodes.*.jvm.mem`, `nodes.*.jvm.gc`, `nodes.*.jvm.buffer_pools`, `nodes.*.os.mem`, `nodes.*.process.cpu`, `nodes.*.thread_pool`, and `nodes.*.transport`. + +Ingest pipelines metrics are only collected at the end since its own collection would affect the benchmark results. + +You can see a sample collected metric [here](./sample_metric.json) + Additionally, if the `reindex-to-metricstore` flag is used, the data generated during the benchmark will be sent to the metricstore into an index called `bench-reindex-{datastream}-{testRunID}` for further analysis. The events will be enriched with metadata related to the benchmark run. \ No newline at end of file From c3e5ef9d5bf1c2447faf03935e5d5d5ada3083b6 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Aug 2023 12:22:49 +0200 Subject: [PATCH 19/23] Rename field and extract index template --- .../benchrunner/runners/system/metrics.go | 73 +++++++------------ .../runners/system/metrics_index.json | 24 ++++++ 2 files changed, 51 insertions(+), 46 deletions(-) create mode 100644 internal/benchrunner/runners/system/metrics_index.json diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 399c2444ac..3743d10d99 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -6,6 +6,7 @@ package system import ( "bytes" + _ "embed" "encoding/json" "fmt" "io" @@ -33,8 +34,8 @@ type collector struct { scenario scenario interval time.Duration - esapi *elasticsearch.API - msapi *elasticsearch.API + esAPI *elasticsearch.API + metricsAPI *elasticsearch.API datastream string pipelinePrefix string @@ -75,7 +76,7 @@ func newCollector( ctxt servicedeployer.ServiceContext, benchName string, scenario scenario, - esapi, msapi *elasticsearch.API, + esAPI, metricsAPI *elasticsearch.API, interval time.Duration, datastream, pipelinePrefix string, ) *collector { @@ -87,8 +88,8 @@ func newCollector( interval: interval, scenario: scenario, metadata: meta, - esapi: esapi, - msapi: msapi, + esAPI: esAPI, + metricsAPI: metricsAPI, datastream: datastream, pipelinePrefix: pipelinePrefix, stopC: make(chan struct{}), @@ -139,14 +140,14 @@ func (c *collector) collect() metrics { ts: time.Now().Unix(), } - nstats, err := ingest.GetNodesStats(c.esapi) + nstats, err := ingest.GetNodesStats(c.esAPI) if err != nil { logger.Debug(err) } else { m.nMetrics = nstats } - dsstats, err := ingest.GetDataStreamStats(c.esapi, c.datastream) + dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream) if err != nil { logger.Debug(err) } else { @@ -157,12 +158,12 @@ func (c *collector) collect() metrics { } func (c *collector) publish(events [][]byte) { - if c.msapi == nil { + if c.metricsAPI == nil { return } for _, e := range events { reqBody := bytes.NewReader(e) - resp, err := c.msapi.Index(c.indexName(), reqBody) + resp, err := c.metricsAPI.Index(c.indexName(), reqBody) if err != nil { logger.Debugf("error indexing event: %v", err) continue @@ -172,6 +173,7 @@ func (c *collector) publish(events [][]byte) { if err != nil { logger.Errorf("failed to read index response body: %v", err) } + resp.Body.Close() if resp.StatusCode != 201 { logger.Errorf("error indexing event (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body)) @@ -179,42 +181,21 @@ func (c *collector) publish(events [][]byte) { } } +//go:embed metrics_index.json +var metricsIndexBytes []byte + func (c *collector) createMetricsIndex() { - if c.msapi == nil { + if c.metricsAPI == nil { return } - reader := bytes.NewReader( - []byte(`{ - "settings": { - "number_of_replicas": 0 - }, - "mappings": { - "dynamic_templates": [ - { - "strings_as_keyword": { - "match_mapping_type": "string", - "mapping": { - "ignore_above": 1024, - "type": "keyword" - } - } - } - ], - "date_detection": false, - "properties": { - "@timestamp": { - "type": "date" - } - } - } - }`), - ) + + reader := bytes.NewReader(metricsIndexBytes) logger.Debugf("creating %s index in metricstore...", c.indexName()) - createRes, err := c.msapi.Indices.Create( + createRes, err := c.metricsAPI.Indices.Create( c.indexName(), - c.msapi.Indices.Create.WithBody(reader), + c.metricsAPI.Indices.Create.WithBody(reader), ) if err != nil { logger.Debugf("could not create index: %v", err) @@ -300,7 +281,7 @@ readyLoop: return case <-waitTick.C: } - dsstats, err := ingest.GetDataStreamStats(c.esapi, c.datastream) + dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream) if err != nil { logger.Debug(err) } @@ -321,18 +302,18 @@ readyLoop: } func (c *collector) collectIngestMetrics() map[string]ingest.PipelineStatsMap { - ipMetrics, err := ingest.GetPipelineStatsByPrefix(c.esapi, c.pipelinePrefix) + ipMetrics, err := ingest.GetPipelineStatsByPrefix(c.esAPI, c.pipelinePrefix) if err != nil { - logger.Debugf("could not get ingest pipeline metrics: %w", err) + logger.Debugf("could not get ingest pipeline metrics: %v", err) return nil } return ipMetrics } func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage { - du, err := ingest.GetDiskUsage(c.esapi, c.datastream) + du, err := ingest.GetDiskUsage(c.esAPI, c.datastream) if err != nil { - logger.Debugf("could not get disk usage metrics: %w", err) + logger.Debugf("could not get disk usage metrics: %v", err) return nil } return du @@ -346,7 +327,7 @@ func (c *collector) collectMetricsPreviousToStop() { } func (c *collector) collectTotalHits() int { - totalHits, err := getTotalHits(c.esapi, c.datastream) + totalHits, err := getTotalHits(c.esAPI, c.datastream) if err != nil { logger.Debugf("could not total hits: %w", err) } @@ -355,11 +336,11 @@ func (c *collector) collectTotalHits() int { func (c *collector) createEventsFromMetrics(m metrics) [][]byte { dsEvent := struct { - Ts int64 `json:"@timestamp"` + Timestamp int64 `json:"@timestamp"` *ingest.DataStreamStats Meta benchMeta `json:"benchmark_metadata"` }{ - Ts: m.ts * 1000, // ms to s + Timestamp: m.ts * 1000, // ms to s DataStreamStats: m.dsMetrics, Meta: c.metadata, } diff --git a/internal/benchrunner/runners/system/metrics_index.json b/internal/benchrunner/runners/system/metrics_index.json new file mode 100644 index 0000000000..5d4e724da0 --- /dev/null +++ b/internal/benchrunner/runners/system/metrics_index.json @@ -0,0 +1,24 @@ +{ + "settings": { + "number_of_replicas": 0 + }, + "mappings": { + "dynamic_templates": [ + { + "strings_as_keyword": { + "match_mapping_type": "string", + "mapping": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + ], + "date_detection": false, + "properties": { + "@timestamp": { + "type": "date" + } + } + } +} \ No newline at end of file From 208d848181d75ff42111d51163c0ff5c12eb5d77 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Aug 2023 12:23:20 +0200 Subject: [PATCH 20/23] Add types and comment to reindex function --- internal/benchrunner/runners/system/runner.go | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index cf494c92ed..e8be30e0c4 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -709,6 +709,7 @@ func (r *runner) enrollAgents() error { return nil } +// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore func (r *runner) reindexData() error { if !r.options.ReindexData { return nil @@ -719,7 +720,7 @@ func (r *runner) reindexData() error { logger.Debug("starting reindexing of data...") - logger.Debug("gettings orignal mappings...") + logger.Debug("getting orignal mappings...") // Get the mapping from the source data stream mappingRes, err := r.options.ESAPI.Indices.GetMapping( r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), @@ -789,28 +790,36 @@ func (r *runner) reindexData() error { } defer res.Body.Close() + type searchRes struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` + } + // Iterate through the search results using the Scroll API for { - var sr map[string]interface{} + var sr searchRes if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { return fmt.Errorf("error decoding search response: %w", err) } - resErr, found := sr["error"] - if found { - errStr := resErr.(map[string]interface{})["reason"].(string) - return fmt.Errorf("error searching for documents: %s", errStr) + if sr.Error != nil { + return fmt.Errorf("error searching for documents: %s", sr.Error.Reason) } - hits, found := sr["hits"].(map[string]interface{})["hits"].([]interface{}) - if !found || len(hits) == 0 { + if len(sr.Hits) == 0 { break } var bulkBodyBuilder strings.Builder - for _, hit := range hits { - bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.(map[string]interface{})["_id"])) - enriched := r.enrichEventWithBenchmarkMetadata(hit.(map[string]interface{})["_source"].(map[string]interface{})) + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) src, err := json.Marshal(enriched) if err != nil { return fmt.Errorf("error decoding _source: %w", err) @@ -818,7 +827,7 @@ func (r *runner) reindexData() error { bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) } - logger.Debugf("bulk request of %d events...", len(hits)) + logger.Debugf("bulk request of %d events...", len(sr.Hits)) bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) if err != nil { @@ -826,19 +835,18 @@ func (r *runner) reindexData() error { } bulkRes.Body.Close() - scrollId, found := sr["_scroll_id"].(string) - if !found { + if sr.ScrollID == "" { return errors.New("error getting scroll ID") } res, err = r.options.ESAPI.Scroll( - r.options.ESAPI.Scroll.WithScrollID(scrollId), + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), r.options.ESAPI.Scroll.WithScroll(time.Minute), ) if err != nil { return fmt.Errorf("error executing scroll: %s", err) } - defer res.Body.Close() + res.Body.Close() } logger.Debug("reindexing operation finished") From 1ec12e81131c517e1034b0a95f54921370a30e21 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Aug 2023 12:23:36 +0200 Subject: [PATCH 21/23] Remove unused fields from node stats --- internal/elasticsearch/ingest/nodestats.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/elasticsearch/ingest/nodestats.go b/internal/elasticsearch/ingest/nodestats.go index f827759aa7..4b9da276bf 100644 --- a/internal/elasticsearch/ingest/nodestats.go +++ b/internal/elasticsearch/ingest/nodestats.go @@ -199,9 +199,7 @@ type NodesStats struct { type NodeStats struct { Breakers map[string]struct { LimitSizeInBytes int `json:"limit_size_in_bytes"` - LimitSize string `json:"limit_size"` EstimatedSizeInBytes int `json:"estimated_size_in_bytes"` - EstimatedSize string `json:"estimated_size"` Overhead float64 `json:"overhead"` Tripped int `json:"tripped"` } From 63c5702ae79cae54c452336ac2228da71775c935 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 22 Aug 2023 10:59:12 +0200 Subject: [PATCH 22/23] Use stack method for ES client creation --- cmd/benchmark.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 66a2e2fc40..b51d6e14ea 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -411,7 +411,7 @@ func initializeESMetricsClient(ctx context.Context) (*elasticsearch.Client, erro return nil, nil } - esClient, err := elasticsearch.NewClient( + esClient, err := stack.NewElasticsearchClient( elasticsearch.OptionWithAddress(address), elasticsearch.OptionWithUsername(user), elasticsearch.OptionWithPassword(pass), From 2a3f889e133a02cc79845532a380c30a8555b9db Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Tue, 22 Aug 2023 11:43:58 +0200 Subject: [PATCH 23/23] Increase test data timeout --- .../system_benchmark/_dev/benchmark/system/logs-benchmark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml index d466953809..56d25b1571 100644 --- a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml +++ b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml @@ -6,7 +6,7 @@ data_stream.name: testds data_stream.vars.paths: - "{{SERVICE_LOGS_DIR}}/corpus-*" warmup_time_period: 10s -wait_for_data_timeout: 1m +wait_for_data_timeout: 10m corpora.generator.size: 20MiB corpora.generator.template.path: ./logs-benchmark/template.log corpora.generator.config.path: ./logs-benchmark/config.yml