diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 065295e8ff..b51d6e14ea 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -5,15 +5,19 @@ package cmd import ( + "context" "errors" "fmt" + "os" "strings" "time" "github.com/dustin/go-humanize" "github.com/elastic/elastic-package/internal/corpusgenerator" + "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/install" + "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/stack" "github.com/spf13/cobra" @@ -215,6 +219,7 @@ func getSystemCommand() *cobra.Command { cmd.Flags().BoolP(cobraext.BenchReindexToMetricstoreFlagName, "", false, cobraext.BenchReindexToMetricstoreFlagDescription) cmd.Flags().DurationP(cobraext.BenchMetricsIntervalFlagName, "", time.Second, cobraext.BenchMetricsIntervalFlagDescription) cmd.Flags().DurationP(cobraext.DeferCleanupFlagName, "", 0, cobraext.DeferCleanupFlagDescription) + cmd.Flags().String(cobraext.VariantFlagName, "", cobraext.VariantFlagDescription) return cmd } @@ -222,6 +227,11 @@ func getSystemCommand() *cobra.Command { func systemCommandAction(cmd *cobra.Command, args []string) error { cmd.Println("Run system benchmarks for the package") + variant, err := cmd.Flags().GetString(cobraext.VariantFlagName) + if err != nil { + return cobraext.FlagParsingError(err, cobraext.VariantFlagName) + } + benchName, err := cmd.Flags().GetString(cobraext.BenchNameFlagName) if err != nil { return cobraext.FlagParsingError(err, cobraext.BenchNameFlagName) @@ -271,7 +281,8 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("can't create Kibana client: %w", err) } - opts := system.NewOptions( + withOpts := []system.OptionFunc{ + system.WithVariant(variant), system.WithBenchmarkName(benchName), system.WithDeferCleanup(deferCleanup), system.WithMetricsInterval(metricsInterval), @@ -280,8 +291,17 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { system.WithESAPI(esClient.API), system.WithKibanaClient(kc), system.WithProfile(profile), - ) - runner := system.NewSystemBenchmark(opts) + } + + esMetricsClient, err := initializeESMetricsClient(cmd.Context()) + if err != nil { + return fmt.Errorf("can't create Elasticsearch metrics client: %w", err) + } + if esMetricsClient != nil { + withOpts = append(withOpts, system.WithESMetricsAPI(esMetricsClient.API)) + } + + runner := system.NewSystemBenchmark(system.NewOptions(withOpts...)) r, err := benchrunner.Run(runner) if err != nil { @@ -293,14 +313,19 @@ func systemCommandAction(cmd *cobra.Command, args []string) error { return fmt.Errorf("system benchmark is expected to return multiple reports") } + reports := multiReport.Split() + if len(reports) != 2 { + return fmt.Errorf("system benchmark is expected to return a human an a file report") + } + // human report will always be the first - human := multiReport.Split()[0] + human := reports[0] if err := reporters.WriteReportable(reporters.Output(outputs.ReportOutputSTDOUT), human); err != nil { return fmt.Errorf("error writing benchmark report: %w", err) } // file report will always be the second - file := multiReport.Split()[1] + file := reports[1] if err := reporters.WriteReportable(reporters.Output(outputs.ReportOutputFile), file); err != nil { return fmt.Errorf("error writing benchmark report: %w", err) } @@ -375,3 +400,30 @@ func generateDataStreamCorpusCommandAction(cmd *cobra.Command, _ []string) error return nil } + +func initializeESMetricsClient(ctx context.Context) (*elasticsearch.Client, error) { + address := os.Getenv(system.ESMetricstoreHostEnv) + user := os.Getenv(system.ESMetricstoreUsernameEnv) + pass := os.Getenv(system.ESMetricstorePasswordEnv) + cacert := os.Getenv(system.ESMetricstoreCACertificateEnv) + if address == "" || user == "" || pass == "" { + logger.Debugf("can't initialize metricstore, missing environment configuration") + return nil, nil + } + + esClient, err := stack.NewElasticsearchClient( + elasticsearch.OptionWithAddress(address), + elasticsearch.OptionWithUsername(user), + elasticsearch.OptionWithPassword(pass), + elasticsearch.OptionWithCertificateAuthority(cacert), + ) + if err != nil { + return nil, err + } + + if err := esClient.CheckHealth(ctx); err != nil { + return nil, err + } + + return esClient, nil +} diff --git a/cmd/service.go b/cmd/service.go index f396c50fa6..c095385c01 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/elastic-package/internal/install" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/service" + "github.com/elastic/elastic-package/internal/testrunner/runners/system" ) const serviceLongDescription = `Use this command to boot up the service stack that can be observed with the package. @@ -70,6 +71,7 @@ func upCommandAction(cmd *cobra.Command, args []string) error { Profile: profile, ServiceName: serviceName, PackageRootPath: packageRoot, + DevDeployDir: system.DevDeployDir, DataStreamRootPath: dataStreamPath, Variant: variantFlag, }) diff --git a/docs/howto/sample_metric.json b/docs/howto/sample_metric.json new file mode 100644 index 0000000000..bc1bdaf612 --- /dev/null +++ b/docs/howto/sample_metric.json @@ -0,0 +1,670 @@ +{ + "@timestamp": 1692610780000, + "cluster_name": "elasticsearch", + "node_name": "M_lyy0n9TSOjGt3kPhnM8Q", + "Breakers": { + "eql_sequence": { + "limit_size_in_bytes": 536870912, + "estimated_size_in_bytes": 0, + "overhead": 1, + "tripped": 0 + }, + "fielddata": { + "limit_size_in_bytes": 429496729, + "estimated_size_in_bytes": 2456, + "overhead": 1.03, + "tripped": 0 + }, + "inflight_requests": { + "limit_size_in_bytes": 1073741824, + "estimated_size_in_bytes": 0, + "overhead": 2, + "tripped": 0 + }, + "model_inference": { + "limit_size_in_bytes": 536870912, + "estimated_size_in_bytes": 0, + "overhead": 1, + "tripped": 0 + }, + "parent": { + "limit_size_in_bytes": 1020054732, + "estimated_size_in_bytes": 349638992, + "overhead": 1, + "tripped": 0 + }, + "request": { + "limit_size_in_bytes": 644245094, + "estimated_size_in_bytes": 0, + "overhead": 1, + "tripped": 0 + } + }, + "indices": { + "docs": { + "count": 502514, + "deleted": 101 + }, + "shard_stats": { + "total_count": 54 + }, + "store": { + "size_in_bytes": 231344895, + "total_data_set_size_in_bytes": 231344895, + "reserved_in_bytes": 0 + }, + "indexing": { + "index_total": 627678, + "index_time_in_millis": 192856, + "index_current": 0, + "index_failed": 43, + "delete_total": 80, + "delete_time_in_millis": 37, + "delete_current": 0, + "noop_update_total": 0, + "is_throttled": false, + "throttle_time_in_millis": 0, + "write_load": 0.00001370894459913557 + }, + "get": { + "total": 171437, + "time_in_millis": 21970, + "exists_total": 168394, + "exists_time_in_millis": 21727, + "missing_total": 3043, + "missing_time_in_millis": 243, + "current": 0 + }, + "search": { + "open_contexts": 0, + "query_total": 200120, + "query_time_in_millis": 98784, + "query_current": 0, + "fetch_total": 200076, + "fetch_time_in_millis": 8849, + "fetch_current": 0, + "scroll_total": 96697, + "scroll_time_in_millis": 10241728, + "scroll_current": 0, + "suggest_total": 0, + "suggest_time_in_millis": 0, + "suggest_current": 0 + }, + "merges": { + "current": 0, + "current_docs": 0, + "current_size_in_bytes": 0, + "total": 11503, + "total_time_in_millis": 187989, + "total_docs": 6782259, + "total_size_in_bytes": 2574092233, + "total_stopped_time_in_millis": 0, + "total_throttled_time_in_millis": 0, + "total_auto_throttle_in_bytes": 1132462080 + }, + "refresh": { + "total": 122510, + "total_time_in_millis": 905546, + "external_total": 77930, + "external_total_time_in_millis": 970183, + "listeners": 0 + }, + "flush": { + "total": 44585, + "periodic": 44585, + "total_time_in_millis": 2077597 + }, + "warmer": { + "current": 0, + "total": 77718, + "total_time_in_millis": 22729 + }, + "query_cache": { + "memory_size_in_bytes": 0, + "total_count": 0, + "hit_count": 0, + "miss_count": 0, + "cache_size": 0, + "cache_count": 0, + "evictions": 0 + }, + "fielddata": { + "memory_size_in_bytes": 2456, + "evictions": 0 + }, + "completion": { + "size_in_bytes": 0 + }, + "segments": { + "count": 743, + "memory_in_bytes": 0, + "terms_memory_in_bytes": 0, + "stored_fields_memory_in_bytes": 0, + "term_vectors_memory_in_bytes": 0, + "norms_memory_in_bytes": 0, + "points_memory_in_bytes": 0, + "doc_values_memory_in_bytes": 0, + "index_writer_memory_in_bytes": 2325280, + "version_map_memory_in_bytes": 348, + "fixed_bit_set_memory_in_bytes": 672, + "max_unsafe_auto_id_timestamp": 1691678283104, + "file_sizes": {} + }, + "translog": { + "operations": 219, + "size_in_bytes": 363972, + "uncommitted_operations": 219, + "uncommitted_size_in_bytes": 363972, + "earliest_last_modified_age": 694 + }, + "request_cache": { + "memory_size_in_bytes": 51488, + "evictions": 0, + "hit_count": 6830, + "miss_count": 1177 + }, + "recovery": { + "current_as_source": 0, + "current_as_target": 0, + "throttle_time_in_millis": 0 + }, + "bulk": { + "total_operations": 336226, + "total_time_in_millis": 222470, + "total_size_in_bytes": 967279105, + "avg_time_in_millis": 0, + "avg_size_in_bytes": 2840 + }, + "mappings": { + "total_count": 11497, + "total_estimated_overhead_in_bytes": 11772928 + } + }, + "jvm": { + "mem": { + "heap_used_in_bytes": 345444688, + "heap_used_percent": 32, + "heap_committed_in_bytes": 1073741824, + "heap_max_in_bytes": 1073741824, + "non_heap_used_in_bytes": 260239608, + "non_heap_committed_in_bytes": 309788672, + "pools": { + "old": { + "used_in_bytes": 199928320, + "max_in_bytes": 1073741824, + "peak_used_in_bytes": 214294528, + "peak_max_in_bytes": 1073741824 + }, + "survivor": { + "used_in_bytes": 32270160, + "max_in_bytes": 0, + "peak_used_in_bytes": 83886080, + "peak_max_in_bytes": 0 + }, + "young": { + "used_in_bytes": 113246208, + "max_in_bytes": 0, + "peak_used_in_bytes": 629145600, + "peak_max_in_bytes": 0 + } + } + }, + "gc": { + "collectors": { + "G1 Concurrent GC": { + "collection_count": 604, + "collection_time_in_millis": 5896 + }, + "old": { + "collection_count": 0, + "collection_time_in_millis": 0 + }, + "young": { + "collection_count": 1511, + "collection_time_in_millis": 17790 + } + } + }, + "buffer_pools": { + "direct": { + "count": 78, + "used_in_bytes": 10105958, + "total_capacity_in_bytes": 10105956 + }, + "mapped": { + "count": 761, + "used_in_bytes": 229499619, + "total_capacity_in_bytes": 229499619 + }, + "mapped - 'non-volatile memory'": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + } + } + }, + "os": { + "mem": { + "total_in_bytes": 12560445440, + "adjusted_total_in_bytes": 12560445440, + "free_in_bytes": 4413337600, + "used_in_bytes": 8147107840, + "free_percent": 35, + "used_percent": 65 + } + }, + "process": { + "cpu": { + "percent": 0, + "total_in_millis": 9628610 + } + }, + "thread_pool": { + "analyze": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "auto_complete": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "azure_event_loop": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ccr": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "cluster_coordination": { + "threads": 1, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 1, + "completed": 2147 + }, + "fetch_shard_started": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "fetch_shard_store": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "flush": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 44597 + }, + "force_merge": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "generic": { + "threads": 9, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 17, + "completed": 775114 + }, + "get": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "management": { + "threads": 5, + "queue": 0, + "active": 1, + "rejected": 0, + "largest": 5, + "completed": 628761 + }, + "ml_datafeed": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ml_job_comms": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ml_native_inference_comms": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "ml_utility": { + "threads": 2, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 2, + "completed": 284235 + }, + "profiling": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "refresh": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 6736640 + }, + "repository_azure": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "rollup_indexing": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "search": { + "threads": 13, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 13, + "completed": 506 + }, + "search_coordination": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "search_throttled": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "searchable_snapshots_cache_fetch_async": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "searchable_snapshots_cache_prewarming": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "security-crypto": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 9 + }, + "security-token-key": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "snapshot": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "snapshot_meta": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "system_critical_read": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 1581 + }, + "system_critical_write": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 180 + }, + "system_read": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 409012 + }, + "system_write": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 104814 + }, + "vector_tile_generation": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "warmer": { + "threads": 4, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 4, + "completed": 342944 + }, + "watcher": { + "threads": 0, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 0, + "completed": 0 + }, + "write": { + "threads": 8, + "queue": 0, + "active": 0, + "rejected": 0, + "largest": 8, + "completed": 868562 + } + }, + "transport": { + "server_open": 0, + "total_outbound_connections": 0, + "rx_count": 0, + "rx_size_in_bytes": 0, + "tx_count": 0, + "tx_size_in_bytes": 0, + "inbound_handling_time_histogram": [ + { + "ge_millis": 0, + "lt_millis": 1, + "count": 273891 + }, + { + "ge_millis": 1, + "lt_millis": 2, + "count": 274801 + }, + { + "ge_millis": 2, + "lt_millis": 4, + "count": 5713 + }, + { + "ge_millis": 4, + "lt_millis": 8, + "count": 834 + }, + { + "ge_millis": 8, + "lt_millis": 16, + "count": 488 + }, + { + "ge_millis": 16, + "lt_millis": 32, + "count": 385 + }, + { + "ge_millis": 32, + "lt_millis": 64, + "count": 161 + }, + { + "ge_millis": 64, + "lt_millis": 128, + "count": 110 + }, + { + "ge_millis": 128, + "lt_millis": 256, + "count": 68 + }, + { + "ge_millis": 256, + "lt_millis": 512, + "count": 30 + }, + { + "ge_millis": 512, + "lt_millis": 1024, + "count": 2 + } + ], + "outbound_handling_time_histogram": [] + }, + "benchmark_metadata": { + "info": { + "benchmark": "logs-benchmark", + "run_id": "a317f7d4-97a2-4ddd-a537-faa6433e62fb" + }, + "parameter": { + "package": "system_benchmarks", + "description": "Benchmark 20MiB of data ingested", + "version": "999.999.999", + "policy_template": "testpo", + "input": "filestream", + "vars": null, + "data_stream": { + "name": "testds", + "vars": { + "paths": [ + "/tmp/service_logs/corpus-*" + ] + } + }, + "warmup_time_period": 10000000000, + "benchmark_time_period": 0, + "wait_for_data_timeout": 60000000000, + "corpora": { + "generator": { + "size": "20MiB", + "template": { + "raw": "", + "path": "./logs-benchmark/template.log", + "type": "" + }, + "config": { + "raw": null, + "path": "./logs-benchmark/config.yml" + }, + "fields": { + "raw": null, + "path": "./logs-benchmark/fields.yml" + } + }, + "input_service": null + } + } + } +} \ No newline at end of file diff --git a/docs/howto/system_benchmarking.md b/docs/howto/system_benchmarking.md index 2db242ad59..77c50da845 100644 --- a/docs/howto/system_benchmarking.md +++ b/docs/howto/system_benchmarking.md @@ -13,15 +13,15 @@ Conceptually, running a system benchmark involves the following steps: 1. Create a benchmark policy that configures a single data stream for a single package. 1. Assign the policy to the enrolled Agent(s). 1. Metrics collections from the cluster starts. (**TODO**: record metrics from all Elastic Agents involved using the `system` integration.) -1. **TODO**: Send the collected metrics to the ES Metricstore if set. -1. Generate data if configured (it uses the [corpus-generator-rool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) +1. Send the collected metrics to the ES Metricstore if set. +1. Generate data if configured (it uses the [corpus-generator-tool](https://github.com/elastic/elastic-integration-corpus-generator-tool)) 1. Wait a reasonable amount of time for the Agent to collect data from the integration service and index it into the correct Elasticsearch data stream. This time can be pre-defined with the `benchmark_time`. In case this setting is not set the benchmark will continue until the number of documents is not changed in the data stream. 1. Metrics collection ends and a summary report is created. 1. Delete test artifacts and tear down the instance of the package's integration service. -1. **TODO**: Optionally reindex all ingested data into the ES Metricstore for further analysis. +1. Optionally reindex all ingested data into the ES Metricstore for further analysis. 1. **TODO**: Optionally compare results against another benchmark run. ## Defining a system benchmark scenario @@ -42,8 +42,8 @@ Optionally system benchmarks can define a configuration for deploying a package' `` - a name of the supported service deployer: * `docker` - Docker Compose - -**TODO**: support other service deployers +* `k8s` - Kubernetes +* `tf` - Terraform ### Docker Compose service deployer @@ -54,6 +54,163 @@ The `docker-compose.yml` file defines the integration service(s) for the package when they are stopped. Docker compose may not be able to find volumes defined in the Dockerfile for this cleanup. In these cases, override the volume definition. +### Terraform service deployer + +When using the Terraform service deployer, the `` must include at least one `*.tf` file. +The `*.tf` files define the infrastructure using the Terraform syntax. The terraform based service can be handy to boot up +resources using selected cloud provider and use them for testing (e.g. observe and collect metrics). + +Sample `main.tf` definition: + +``` +variable "TEST_RUN_ID" { + default = "detached" +} + +provider "aws" {} + +resource "aws_instance" "i" { + ami = data.aws_ami.latest-amzn.id + monitoring = true + instance_type = "t1.micro" + tags = { + Name = "elastic-package-test-${var.TEST_RUN_ID}" + } +} + +data "aws_ami" "latest-amzn" { + most_recent = true + owners = [ "amazon" ] # AWS + filter { + name = "name" + values = ["amzn2-ami-hvm-*"] + } +} +``` + +Notice the use of the `TEST_RUN_ID` variable. It contains a unique ID, which can help differentiate resources created in potential concurrent test runs. + +#### Terraform Outputs + +The outputs generated by the terraform service deployer can be accessed in the system test config using handlebars template. +For example, if a `SQS queue` is configured in terraform and if the `queue_url` is configured as output , it can be used in the test config as a handlebars template `{{TF_OUTPUT_queue_url}}` + +Sample Terraform definition + +``` +resource "aws_sqs_queue" "test" { + +} + +output "queue_url"{ + value = aws_sqs_queue.test.url +} +``` + +Sample system test config + +``` yaml +data_stream: + vars: + period: 5m + latency: 10m + queue_url: '{{TF_OUTPUT_queue_url}}' + tags_filter: |- + - key: Name + value: "elastic-package-test-{{TEST_RUN_ID}}" +``` + +For complex outputs from terraform you can use `{{TF_OUTPUT_root_key.nested_key}}` + +``` +output "root_key"{ + value = someoutput.nested_key_value +} +``` +``` json +{ + "root_key": { + "sensitive": false, + "type": [ + "object", + { + "nested_key": "string" + } + ], + "value": { + "nested_key": "this is a nested key" + } + } +} +``` +``` yaml +data_stream: + vars: + queue_url: '{{TF_OUTPUT_root_key.nested_key}}' +``` + +#### Environment variables + +To use environment variables within the Terraform service deployer a `env.yml` file is required. + +The file should be structured like this: + +```yaml +version: '2.3' +services: + terraform: + environment: + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} +``` + +It's purpose is to inject environment variables in the Terraform service deployer environment. + +To specify a default use this syntax: `AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-default}`, replacing `default` with the desired default value. + +**NOTE**: Terraform requires to prefix variables using the environment variables form with `TF_VAR_`. These variables are not available in test case definitions because they are [not injected](https://github.com/elastic/elastic-package/blob/f5312b6022e3527684e591f99e73992a73baafcf/internal/testrunner/runners/system/servicedeployer/terraform_env.go#L43) in the test environment. + +#### Cloud Provider CI support + +Terraform is often used to interact with Cloud Providers. This require Cloud Provider credentials. + +Injecting credentials can be achieved with functions from the [`apm-pipeline-library`](https://github.com/elastic/apm-pipeline-library/tree/main/vars) Jenkins library. For example look for `withAzureCredentials`, `withAWSEnv` or `withGCPEnv`. + +#### Tagging/labelling created Cloud Provider resources + +Leveraging Terraform to create cloud resources is useful but risks creating leftover resources that are difficult to remove. + +There are some specific environment variables that should be leveraged to overcome this issue; these variables are already injected to be used by Terraform (through `TF_VAR_`): +- `TF_VAR_TEST_RUN_ID`: a unique identifier for the test run, allows to distinguish each run +- `BRANCH_NAME_LOWER_CASE`: the branch name or PR number the CI run is linked to +- `BUILD_ID`: incremental number providing the current CI run number +- `CREATED_DATE`: the creation date in epoch time, milliseconds, when the resource was created +- `ENVIRONMENT`: what environment created the resource (`ci`) +- `REPO`: the GitHub repository name (`elastic-package`) + +### Kubernetes service deployer + +The Kubernetes service deployer requires the `_dev/benchmark/system/deploy/k8s` directory to be present. It can include additional `*.yaml` files to deploy +custom applications in the Kubernetes cluster (e.g. Nginx deployment). It is possible to use a `kustomization.yaml` file. +If no resource definitions (`*.yaml` files ) are needed, +the `_dev/benchmark/system/deploy/k8s` directory must contain an `.empty` file (to preserve the `k8s` directory under version control). + +The Kubernetes service deployer needs [kind](https://kind.sigs.k8s.io/) to be installed and the cluster to be up and running: +```bash +wget -qO- https://raw.githubusercontent.com/elastic/elastic-package/main/scripts/kind-config.yaml | kind create cluster --config - +``` + +Before executing system tests, the service deployer applies once the deployment of the Elastic Agent to the cluster and links +the kind cluster with the Elastic stack network - applications running in the kind cluster can reach Elasticsearch and Kibana instances. +To shorten the total test execution time the Elastic Agent's deployment is not deleted after tests, but it can be reused. + +See how to execute system benchmarks for the Kubernetes integration: + +```bash +elastic-package stack up -d -v # start the Elastic stack +wget -qO- https://raw.githubusercontent.com/elastic/elastic-package/main/scripts/kind-config.yaml | kind create cluster --config - +elastic-package benchmark system --benchmark k8s-benchmark -v +``` + ### Benchmark scenario definition Next, we must define at least one configuration for the package that we @@ -75,6 +232,7 @@ along with values for package and data stream-level variables. These are the ava | package | string | | The name of the package. If omitted will pick the current package, this is to allow for future definition of benchmarks outside of the packages folders. | | description | string | | A description for the scenario. | | version | string | | The version of the package to benchmark. If omitted will pick the current version of the package. | +| policy_template | string | | The policy template to test. If omitted will pick the first one. | | input | string | yes | Input type to test (e.g. logfile, httpjson, etc). Defaults to the input used by the first stream in the data stream manifest. | | vars | dictionary | | Package level variables to set (i.e. declared in `$package_root/manifest.yml`). If not specified the defaults from the manifest are used. | | data_stream.name | string | yes | The data stream to benchmark. | @@ -243,3 +401,33 @@ Finally, when you are done running the benchmark, bring down the Elastic Stack. elastic-package stack down ``` +## Setting up an external metricstore + +A metricstore can be set up to send metrics collected during the benchmark execution. + +An external metricstore might be useful for: + +- Store monitoring data of the benchmark scenario for all its execution time. +- Analyse the data generated during a benchmark. This is possible when using the `reindex-to-metricstore` flag. +- **TODO**: Store benchmark results for various benchmark runs permanently for later comparison. + +In order to initialize it, you need to set up the following environment variables: + +```bash +export ELASTIC_PACKAGE_ESMETRICSTORE_HOST=https://127.0.0.1:9200 +export ELASTIC_PACKAGE_ESMETRICSTORE_USERNAME=elastic +export ELASTIC_PACKAGE_ESMETRICSTORE_PASSWORD=changeme +export ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT="$HOME/.elastic-package/profiles/default/certs/ca-cert.pem" +``` + +The only one that is optional is `ELASTIC_PACKAGE_ESMETRICSTORE_CA_CERT`. + +When these are detected, metrics will be automatically collected every second and sent to a new index called `bench-metrics-{dataset}-{testRunID}"`. + +The collected metrics include the following node stats: `nodes.*.breakers`, `nodes.*.indices`, `nodes.*.jvm.mem`, `nodes.*.jvm.gc`, `nodes.*.jvm.buffer_pools`, `nodes.*.os.mem`, `nodes.*.process.cpu`, `nodes.*.thread_pool`, and `nodes.*.transport`. + +Ingest pipelines metrics are only collected at the end since its own collection would affect the benchmark results. + +You can see a sample collected metric [here](./sample_metric.json) + +Additionally, if the `reindex-to-metricstore` flag is used, the data generated during the benchmark will be sent to the metricstore into an index called `bench-reindex-{datastream}-{testRunID}` for further analysis. The events will be enriched with metadata related to the benchmark run. \ No newline at end of file diff --git a/internal/benchrunner/runner.go b/internal/benchrunner/runner.go index 00399d14d7..13e939957f 100644 --- a/internal/benchrunner/runner.go +++ b/internal/benchrunner/runner.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/elastic/elastic-package/internal/benchrunner/reporters" + "github.com/elastic/elastic-package/internal/logger" ) // Type represents the various supported benchmark types @@ -26,20 +27,21 @@ func Run(runner Runner) (reporters.Reportable, error) { return nil, errors.New("a runner is required") } + defer func() { + tdErr := runner.TearDown() + if tdErr != nil { + logger.Errorf("could not teardown benchmark runner: %v", tdErr) + } + }() + if err := runner.SetUp(); err != nil { return nil, fmt.Errorf("could not set up benchmark runner: %w", err) } report, err := runner.Run() - tdErr := runner.TearDown() - if err != nil { return nil, fmt.Errorf("could not complete benchmark run: %w", err) } - if tdErr != nil { - return report, fmt.Errorf("could not teardown benchmark runner: %w", tdErr) - } - return report, nil } diff --git a/internal/benchrunner/runners/system/metrics.go b/internal/benchrunner/runners/system/metrics.go index 40edae2c1e..3743d10d99 100644 --- a/internal/benchrunner/runners/system/metrics.go +++ b/internal/benchrunner/runners/system/metrics.go @@ -5,30 +5,49 @@ package system import ( + "bytes" + _ "embed" + "encoding/json" + "fmt" + "io" "sync" + "sync/atomic" "time" - "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/elasticsearch/ingest" + "github.com/elastic/elastic-package/internal/environment" "github.com/elastic/elastic-package/internal/logger" - "github.com/elastic/elastic-package/internal/signal" + "github.com/elastic/elastic-package/internal/servicedeployer" +) + +var ( + ESMetricstoreHostEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_HOST") + ESMetricstoreUsernameEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_USERNAME") + ESMetricstorePasswordEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_PASSWORD") + ESMetricstoreCACertificateEnv = environment.WithElasticPackagePrefix("ESMETRICSTORE_CA_CERT") ) type collector struct { - ctxt servicedeployer.ServiceContext - warmupD time.Duration + ctxt servicedeployer.ServiceContext + metadata benchMeta + scenario scenario + interval time.Duration - esapi *elasticsearch.API + esAPI *elasticsearch.API + metricsAPI *elasticsearch.API datastream string pipelinePrefix string - stopC chan struct{} - tick *time.Ticker + wg sync.WaitGroup + stopped atomic.Bool + stopC chan struct{} + tick *time.Ticker startIngestMetrics map[string]ingest.PipelineStatsMap endIngestMetrics map[string]ingest.PipelineStatsMap - collectedMetrics []metrics + startMetrics metrics + endMetrics metrics diskUsage map[string]ingest.DiskUsage startTotalHits int endTotalHits int @@ -50,100 +69,163 @@ type metricsSummary struct { IngestPipelineStats map[string]ingest.PipelineStatsMap DiskUsage map[string]ingest.DiskUsage TotalHits int + NodesStats map[string]ingest.NodeStats } func newCollector( ctxt servicedeployer.ServiceContext, - esapi *elasticsearch.API, - interval, warmup time.Duration, + benchName string, + scenario scenario, + esAPI, metricsAPI *elasticsearch.API, + interval time.Duration, datastream, pipelinePrefix string, ) *collector { + meta := benchMeta{Parameters: scenario} + meta.Info.Benchmark = benchName + meta.Info.RunID = ctxt.Test.RunID return &collector{ ctxt: ctxt, interval: interval, - warmupD: warmup, - esapi: esapi, + scenario: scenario, + metadata: meta, + esAPI: esAPI, + metricsAPI: metricsAPI, datastream: datastream, pipelinePrefix: pipelinePrefix, - stopC: make(chan struct{}, 1), + stopC: make(chan struct{}), } } func (c *collector) start() { c.tick = time.NewTicker(c.interval) + c.createMetricsIndex() + var once sync.Once + c.wg.Add(1) go func() { - var once sync.Once - once.Do(c.waitUntilReady) - defer c.tick.Stop() - - c.startIngestMetrics = c.collectIngestMetrics() - c.startTotalHits = c.collectTotalHits() - + defer c.wg.Done() for { - if signal.SIGINT() { - logger.Debug("SIGINT: cancel metrics collection") - c.collectMetricsPreviousToStop() - return - } - select { case <-c.stopC: // last collect before stopping c.collectMetricsPreviousToStop() - c.stopC <- struct{}{} + c.publish(c.createEventsFromMetrics(c.endMetrics)) return case <-c.tick.C: - c.collect() + once.Do(func() { + c.waitUntilReady() + c.startIngestMetrics = c.collectIngestMetrics() + c.startTotalHits = c.collectTotalHits() + c.startMetrics = c.collect() + c.publish(c.createEventsFromMetrics(c.startMetrics)) + }) + m := c.collect() + c.publish(c.createEventsFromMetrics(m)) } } }() } func (c *collector) stop() { - c.stopC <- struct{}{} - <-c.stopC + if !c.stopped.CompareAndSwap(false, true) { + return + } close(c.stopC) + c.wg.Wait() } -func (c *collector) collect() { +func (c *collector) collect() metrics { m := metrics{ ts: time.Now().Unix(), } - nstats, err := ingest.GetNodesStats(c.esapi) + nstats, err := ingest.GetNodesStats(c.esAPI) if err != nil { logger.Debug(err) } else { m.nMetrics = nstats } - dsstats, err := ingest.GetDataStreamStats(c.esapi, c.datastream) + dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream) if err != nil { logger.Debug(err) } else { m.dsMetrics = dsstats } - c.collectedMetrics = append(c.collectedMetrics, m) + return m +} + +func (c *collector) publish(events [][]byte) { + if c.metricsAPI == nil { + return + } + for _, e := range events { + reqBody := bytes.NewReader(e) + resp, err := c.metricsAPI.Index(c.indexName(), reqBody) + if err != nil { + logger.Debugf("error indexing event: %v", err) + continue + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + logger.Errorf("failed to read index response body: %v", err) + } + resp.Body.Close() + + if resp.StatusCode != 201 { + logger.Errorf("error indexing event (%d): %s: %v", resp.StatusCode, resp.Status(), elasticsearch.NewError(body)) + } + } +} + +//go:embed metrics_index.json +var metricsIndexBytes []byte + +func (c *collector) createMetricsIndex() { + if c.metricsAPI == nil { + return + } + + reader := bytes.NewReader(metricsIndexBytes) + + logger.Debugf("creating %s index in metricstore...", c.indexName()) + + createRes, err := c.metricsAPI.Indices.Create( + c.indexName(), + c.metricsAPI.Indices.Create.WithBody(reader), + ) + if err != nil { + logger.Debugf("could not create index: %v", err) + return + } + createRes.Body.Close() + + if createRes.IsError() { + logger.Debug("got a response error while creating index") + } +} + +func (c *collector) indexName() string { + return fmt.Sprintf("bench-metrics-%s-%s", c.datastream, c.ctxt.Test.RunID) } func (c *collector) summarize() (*metricsSummary, error) { sum := metricsSummary{ - RunID: c.ctxt.Bench.RunID, + RunID: c.ctxt.Test.RunID, IngestPipelineStats: make(map[string]ingest.PipelineStatsMap), + NodesStats: make(map[string]ingest.NodeStats), DiskUsage: c.diskUsage, TotalHits: c.endTotalHits - c.startTotalHits, } - if len(c.collectedMetrics) > 0 { - sum.CollectionStartTs = c.collectedMetrics[0].ts - sum.CollectionEndTs = c.collectedMetrics[len(c.collectedMetrics)-1].ts - sum.DataStreamStats = c.collectedMetrics[len(c.collectedMetrics)-1].dsMetrics - sum.ClusterName = c.collectedMetrics[0].nMetrics.ClusterName - sum.Nodes = len(c.collectedMetrics[len(c.collectedMetrics)-1].nMetrics.Nodes) - } + sum.ClusterName = c.startMetrics.nMetrics.ClusterName + sum.CollectionStartTs = c.startMetrics.ts + sum.CollectionEndTs = c.endMetrics.ts + sum.DataStreamStats = c.endMetrics.dsMetrics + sum.Nodes = len(c.endMetrics.nMetrics.Nodes) for node, endPStats := range c.endIngestMetrics { startPStats, found := c.startIngestMetrics[node] @@ -160,8 +242,9 @@ func (c *collector) summarize() (*metricsSummary, error) { } sumStats[pname] = ingest.PipelineStats{ StatsRecord: ingest.StatsRecord{ - Count: endStats.Count - startStats.Count, - Failed: endStats.TimeInMillis - startStats.TimeInMillis, + Count: endStats.Count - startStats.Count, + Failed: endStats.Failed - startStats.Failed, + TimeInMillis: endStats.TimeInMillis - startStats.TimeInMillis, }, Processors: make([]ingest.ProcessorStats, len(endStats.Processors)), } @@ -193,13 +276,12 @@ func (c *collector) waitUntilReady() { readyLoop: for { - if signal.SIGINT() { - logger.Debug("SIGINT: cancel metrics collection") + select { + case <-c.stopC: return + case <-waitTick.C: } - - <-waitTick.C - dsstats, err := ingest.GetDataStreamStats(c.esapi, c.datastream) + dsstats, err := ingest.GetDataStreamStats(c.esAPI, c.datastream) if err != nil { logger.Debug(err) } @@ -208,42 +290,89 @@ readyLoop: } } - if c.warmupD > 0 { - logger.Debugf("waiting %s for warmup period", c.warmupD) - <-time.After(c.warmupD) + if c.scenario.WarmupTimePeriod > 0 { + logger.Debugf("waiting %s for warmup period", c.scenario.WarmupTimePeriod) + select { + case <-c.stopC: + return + case <-time.After(c.scenario.WarmupTimePeriod): + } } logger.Debug("metric collection starting...") } func (c *collector) collectIngestMetrics() map[string]ingest.PipelineStatsMap { - ipMetrics, err := ingest.GetPipelineStatsByPrefix(c.esapi, c.pipelinePrefix) + ipMetrics, err := ingest.GetPipelineStatsByPrefix(c.esAPI, c.pipelinePrefix) if err != nil { - logger.Debugf("could not get ingest pipeline metrics: %w", err) + logger.Debugf("could not get ingest pipeline metrics: %v", err) return nil } return ipMetrics } func (c *collector) collectDiskUsage() map[string]ingest.DiskUsage { - du, err := ingest.GetDiskUsage(c.esapi, c.datastream) + du, err := ingest.GetDiskUsage(c.esAPI, c.datastream) if err != nil { - logger.Debugf("could not get disk usage metrics: %w", err) + logger.Debugf("could not get disk usage metrics: %v", err) return nil } return du } func (c *collector) collectMetricsPreviousToStop() { - c.collect() c.endIngestMetrics = c.collectIngestMetrics() c.diskUsage = c.collectDiskUsage() c.endTotalHits = c.collectTotalHits() + c.endMetrics = c.collect() } func (c *collector) collectTotalHits() int { - totalHits, err := getTotalHits(c.esapi, c.datastream) + totalHits, err := getTotalHits(c.esAPI, c.datastream) if err != nil { logger.Debugf("could not total hits: %w", err) } return totalHits } + +func (c *collector) createEventsFromMetrics(m metrics) [][]byte { + dsEvent := struct { + Timestamp int64 `json:"@timestamp"` + *ingest.DataStreamStats + Meta benchMeta `json:"benchmark_metadata"` + }{ + Timestamp: m.ts * 1000, // ms to s + DataStreamStats: m.dsMetrics, + Meta: c.metadata, + } + + type nEvent struct { + Ts int64 `json:"@timestamp"` + ClusterName string `json:"cluster_name"` + NodeName string `json:"node_name"` + *ingest.NodeStats + Meta benchMeta `json:"benchmark_metadata"` + } + + var nEvents []interface{} + + for node, stats := range m.nMetrics.Nodes { + nEvents = append(nEvents, nEvent{ + Ts: m.ts * 1000, // ms to s + ClusterName: m.nMetrics.ClusterName, + NodeName: node, + NodeStats: &stats, + Meta: c.metadata, + }) + } + + var events [][]byte + for _, e := range append(nEvents, dsEvent) { + b, err := json.Marshal(e) + if err != nil { + logger.Debugf("error marhsaling metrics event: %w", err) + continue + } + events = append(events, b) + } + return events +} diff --git a/internal/benchrunner/runners/system/metrics_index.json b/internal/benchrunner/runners/system/metrics_index.json new file mode 100644 index 0000000000..5d4e724da0 --- /dev/null +++ b/internal/benchrunner/runners/system/metrics_index.json @@ -0,0 +1,24 @@ +{ + "settings": { + "number_of_replicas": 0 + }, + "mappings": { + "dynamic_templates": [ + { + "strings_as_keyword": { + "match_mapping_type": "string", + "mapping": { + "ignore_above": 1024, + "type": "keyword" + } + } + } + ], + "date_detection": false, + "properties": { + "@timestamp": { + "type": "date" + } + } + } +} \ No newline at end of file diff --git a/internal/benchrunner/runners/system/options.go b/internal/benchrunner/runners/system/options.go index dd540569ff..e719a027c7 100644 --- a/internal/benchrunner/runners/system/options.go +++ b/internal/benchrunner/runners/system/options.go @@ -14,15 +14,16 @@ import ( // Options contains benchmark runner options. type Options struct { - Profile *profile.Profile - ESAPI *elasticsearch.API - KibanaClient *kibana.Client - DeferCleanup time.Duration - MetricsInterval time.Duration - ReindexData bool - MetricstoreESURL string - BenchName string - PackageRootPath string + ESAPI *elasticsearch.API + KibanaClient *kibana.Client + DeferCleanup time.Duration + MetricsInterval time.Duration + ReindexData bool + ESMetricsAPI *elasticsearch.API + BenchName string + PackageRootPath string + Variant string + Profile *profile.Profile } type OptionFunc func(*Options) @@ -64,17 +65,31 @@ func WithDeferCleanup(d time.Duration) OptionFunc { opts.DeferCleanup = d } } + func WithMetricsInterval(d time.Duration) OptionFunc { return func(opts *Options) { opts.MetricsInterval = d } } + func WithDataReindexing(b bool) OptionFunc { return func(opts *Options) { opts.ReindexData = b } } +func WithESMetricsAPI(api *elasticsearch.API) OptionFunc { + return func(opts *Options) { + opts.ESMetricsAPI = api + } +} + +func WithVariant(name string) OptionFunc { + return func(opts *Options) { + opts.Variant = name + } +} + func WithProfile(p *profile.Profile) OptionFunc { return func(opts *Options) { opts.Profile = p diff --git a/internal/benchrunner/runners/system/report.go b/internal/benchrunner/runners/system/report.go index 5aa5430a21..4d6d57016a 100644 --- a/internal/benchrunner/runners/system/report.go +++ b/internal/benchrunner/runners/system/report.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/dustin/go-humanize" "github.com/jedib0t/go-pretty/table" "github.com/jedib0t/go-pretty/text" @@ -78,7 +79,7 @@ func newReport(benchName, corporaFile string, s *scenario, sum *metricsSummary) report.Parameters.DataStream = s.DataStream report.Parameters.WarmupTimePeriod = s.WarmupTimePeriod report.Parameters.BenchmarkTimePeriod = s.BenchmarkTimePeriod - report.Parameters.WaitForDataTimeout = s.WaitForDataTimeout + report.Parameters.WaitForDataTimeout = *s.WaitForDataTimeout report.Parameters.Corpora = s.Corpora report.ClusterName = sum.ClusterName report.Nodes = sum.Nodes @@ -173,20 +174,31 @@ func reportHumanFormat(r *report) []byte { adu := du.AllFields report.WriteString(renderBenchmarkTable( fmt.Sprintf("disk usage for index %s (for all fields)", index), - "total", adu.Total, - "inverted_index.total", adu.InvertedIndex.Total, - "inverted_index.stored_fields", adu.StoredFields, - "inverted_index.doc_values", adu.DocValues, - "inverted_index.points", adu.Points, - "inverted_index.norms", adu.Norms, - "inverted_index.term_vectors", adu.TermVectors, - "inverted_index.knn_vectors", adu.KnnVectors, + "total", humanize.Bytes(adu.TotalInBytes), + "inverted_index.total", humanize.Bytes(adu.InvertedIndex.TotalInBytes), + "inverted_index.stored_fields", humanize.Bytes(adu.StoredFieldsInBytes), + "inverted_index.doc_values", humanize.Bytes(adu.DocValuesInBytes), + "inverted_index.points", humanize.Bytes(adu.PointsInBytes), + "inverted_index.norms", humanize.Bytes(adu.NormsInBytes), + "inverted_index.term_vectors", humanize.Bytes(adu.TermVectorsInBytes), + "inverted_index.knn_vectors", humanize.Bytes(adu.KnnVectorsInBytes), ) + "\n") } for node, pStats := range r.IngestPipelineStats { for pipeline, stats := range pStats { - var kvs []interface{} + if stats.Count == 0 { + continue + } + kvs := []interface{}{ + "Totals", + fmt.Sprintf( + "Count: %d | Failed: %d | Time: %s", + stats.Count, + stats.Failed, + time.Duration(stats.TimeInMillis)*time.Millisecond, + ), + } for _, procStats := range stats.Processors { str := fmt.Sprintf( "Count: %d | Failed: %d | Time: %s", diff --git a/internal/benchrunner/runners/system/runner.go b/internal/benchrunner/runners/system/runner.go index 5001f2c177..e8be30e0c4 100644 --- a/internal/benchrunner/runners/system/runner.go +++ b/internal/benchrunner/runners/system/runner.go @@ -26,13 +26,13 @@ import ( "github.com/elastic/elastic-package/internal/benchrunner" "github.com/elastic/elastic-package/internal/benchrunner/reporters" - "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" "github.com/elastic/elastic-package/internal/configuration/locations" "github.com/elastic/elastic-package/internal/elasticsearch" "github.com/elastic/elastic-package/internal/kibana" "github.com/elastic/elastic-package/internal/logger" "github.com/elastic/elastic-package/internal/multierror" "github.com/elastic/elastic-package/internal/packages" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/signal" ) @@ -40,11 +40,8 @@ const ( // ServiceLogsAgentDir is folder path where log files produced by the service // are stored on the Agent container's filesystem. ServiceLogsAgentDir = "/tmp/service_logs" + devDeployDir = "_dev/benchmark/system/deploy" - waitForDataDefaultTimeout = 10 * time.Minute -) - -const ( // BenchType defining system benchmark BenchType benchrunner.Type = "system" ) @@ -140,7 +137,13 @@ func (r *runner) setUp() error { serviceLogsDir := locationManager.ServiceLogDir() r.ctxt.Logs.Folder.Local = serviceLogsDir r.ctxt.Logs.Folder.Agent = ServiceLogsAgentDir - r.ctxt.Bench.RunID = createRunID() + r.ctxt.Test.RunID = createRunID() + + outputDir, err := servicedeployer.CreateOutputDir(locationManager, r.ctxt.Test.RunID) + if err != nil { + return fmt.Errorf("could not create output dir for terraform deployer %w", err) + } + r.ctxt.OutputDir = outputDir scenario, err := readConfig(r.options.PackageRootPath, r.options.BenchName, r.ctxt) if err != nil { @@ -229,10 +232,14 @@ func (r *runner) run() (report reporters.Reportable, err error) { if r.scenario.Corpora.InputService != nil { // Setup service. logger.Debug("setting up service...") - serviceDeployer, err := servicedeployer.Factory(servicedeployer.FactoryOptions{ - Profile: r.options.Profile, - RootPath: r.options.PackageRootPath, - }) + opts := servicedeployer.FactoryOptions{ + PackageRootPath: r.options.PackageRootPath, + DevDeployDir: devDeployDir, + Variant: r.options.Variant, + Profile: r.options.Profile, + Type: servicedeployer.TypeBench, + } + serviceDeployer, err := servicedeployer.Factory(opts) if err != nil { return nil, fmt.Errorf("could not create service runner: %w", err) @@ -256,6 +263,7 @@ func (r *runner) run() (report reporters.Reportable, err error) { } r.startMetricsColletion() + defer r.mcollector.stop() // if there is a generator config, generate the data if r.generator != nil { @@ -277,28 +285,35 @@ func (r *runner) run() (report reporters.Reportable, err error) { } } - if err := r.waitUntilBenchmarkFinishes(); err != nil { + finishedOnTime, err := r.waitUntilBenchmarkFinishes() + if err != nil { return nil, err } + if !finishedOnTime { + return nil, errors.New("timeout exceeded") + } msum, err := r.collectAndSummarizeMetrics() if err != nil { return nil, fmt.Errorf("can't summarize metrics: %w", err) } - // TODO reindex if configured and es metricstore is set + if err := r.reindexData(); err != nil { + return nil, fmt.Errorf("can't reindex data: %w", err) + } return createReport(r.options.BenchName, r.corporaFile, r.scenario, msum) } func (r *runner) startMetricsColletion() { - // TODO send metrics to es metricstore if set // TODO collect agent hosts metrics using system integration r.mcollector = newCollector( r.ctxt, + r.options.BenchName, + *r.scenario, r.options.ESAPI, + r.options.ESMetricsAPI, r.options.MetricsInterval, - r.scenario.WarmupTimePeriod, r.runtimeDataStream, r.pipelinePrefix, ) @@ -375,15 +390,18 @@ func (r *runner) createPackagePolicy(pkgManifest *packages.PackageManifest, p *k r.scenario.Package = pkgManifest.Name } - // TODO: add ability to define which policy template to use + if r.scenario.PolicyTemplate == "" { + r.scenario.PolicyTemplate = pkgManifest.PolicyTemplates[0].Name + } + pp := kibana.PackagePolicy{ Namespace: "ep", PolicyID: p.ID, - Vars: r.scenario.Vars, Force: true, Inputs: map[string]kibana.PackagePolicyInput{ - fmt.Sprintf("%s-%s", pkgManifest.PolicyTemplates[0].Name, r.scenario.Input): { + fmt.Sprintf("%s-%s", r.scenario.PolicyTemplate, r.scenario.Input): { Enabled: true, + Vars: r.scenario.Vars, Streams: map[string]kibana.PackagePolicyStream{ fmt.Sprintf("%s.%s", pkgManifest.Name, r.scenario.DataStream.Name): { Enabled: true, @@ -539,6 +557,10 @@ func (r *runner) runGenerator(destDir string) error { } defer f.Close() + if err := f.Chmod(os.ModePerm); err != nil { + return err + } + buf := bytes.NewBufferString("") var corpusDocsCount uint64 for { @@ -601,19 +623,15 @@ func (r *runner) checkEnrolledAgents() ([]kibana.Agent, error) { return agents, nil } -func (r *runner) waitUntilBenchmarkFinishes() error { +func (r *runner) waitUntilBenchmarkFinishes() (bool, error) { logger.Debug("checking for all data in data stream...") var benchTime *time.Timer if r.scenario.BenchmarkTimePeriod > 0 { benchTime = time.NewTimer(r.scenario.BenchmarkTimePeriod) } - waitForDataTimeout := waitForDataDefaultTimeout - if r.scenario.WaitForDataTimeout > 0 { - waitForDataTimeout = r.scenario.WaitForDataTimeout - } oldHits := 0 - _, err := waitUntilTrue(func() (bool, error) { + return waitUntilTrue(func() (bool, error) { if signal.SIGINT() { return true, errors.New("SIGINT: cancel waiting for policy assigned") } @@ -639,8 +657,7 @@ func (r *runner) waitUntilBenchmarkFinishes() error { } return ret, err - }, waitForDataTimeout) - return err + }, *r.scenario.WaitForDataTimeout) } func (r *runner) enrollAgents() error { @@ -692,6 +709,167 @@ func (r *runner) enrollAgents() error { return nil } +// reindexData will read all data generated during the benchmark and will reindex it to the metrisctore +func (r *runner) reindexData() error { + if !r.options.ReindexData { + return nil + } + if r.options.ESMetricsAPI == nil { + return errors.New("the option to reindex data is set, but the metricstore was not initialized") + } + + logger.Debug("starting reindexing of data...") + + logger.Debug("getting orignal mappings...") + // Get the mapping from the source data stream + mappingRes, err := r.options.ESAPI.Indices.GetMapping( + r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream), + ) + if err != nil { + return fmt.Errorf("error getting mapping: %w", err) + } + defer mappingRes.Body.Close() + + body, err := io.ReadAll(mappingRes.Body) + if err != nil { + return fmt.Errorf("error reading mapping body: %w", err) + } + + mappings := map[string]struct { + Mappings json.RawMessage + }{} + + if err := json.Unmarshal(body, &mappings); err != nil { + return fmt.Errorf("error unmarshaling mappings: %w", err) + } + + if len(mappings) != 1 { + return fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappings)) + } + + var mapping string + for _, v := range mappings { + mapping = string(v.Mappings) + } + + reader := bytes.NewReader( + []byte(fmt.Sprintf(`{ + "settings": {"number_of_replicas":0}, + "mappings": %s + }`, mapping)), + ) + + indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.ctxt.Test.RunID) + + logger.Debugf("creating %s index in metricstore...", indexName) + + createRes, err := r.options.ESMetricsAPI.Indices.Create( + indexName, + r.options.ESMetricsAPI.Indices.Create.WithBody(reader), + ) + if err != nil { + return fmt.Errorf("could not create index: %w", err) + } + defer createRes.Body.Close() + + if createRes.IsError() { + return errors.New("got a response error while creating index") + } + + bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`) + + logger.Debug("starting scrolling of events...") + res, err := r.options.ESAPI.Search( + r.options.ESAPI.Search.WithIndex(r.runtimeDataStream), + r.options.ESAPI.Search.WithBody(bodyReader), + r.options.ESAPI.Search.WithScroll(time.Minute), + r.options.ESAPI.Search.WithSize(10000), + ) + if err != nil { + return fmt.Errorf("error executing search: %w", err) + } + defer res.Body.Close() + + type searchRes struct { + Error *struct { + Reason string `json:"reson"` + } `json:"error"` + ScrollID string `json:"_scroll_id"` + Hits []struct { + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` + } + + // Iterate through the search results using the Scroll API + for { + var sr searchRes + if err := json.NewDecoder(res.Body).Decode(&sr); err != nil { + return fmt.Errorf("error decoding search response: %w", err) + } + + if sr.Error != nil { + return fmt.Errorf("error searching for documents: %s", sr.Error.Reason) + } + + if len(sr.Hits) == 0 { + break + } + + var bulkBodyBuilder strings.Builder + for _, hit := range sr.Hits { + bulkBodyBuilder.WriteString(fmt.Sprintf("{\"index\":{\"_index\":\"%s\",\"_id\":\"%s\"}}\n", indexName, hit.ID)) + enriched := r.enrichEventWithBenchmarkMetadata(hit.Source) + src, err := json.Marshal(enriched) + if err != nil { + return fmt.Errorf("error decoding _source: %w", err) + } + bulkBodyBuilder.WriteString(fmt.Sprintf("%s\n", string(src))) + } + + logger.Debugf("bulk request of %d events...", len(sr.Hits)) + + bulkRes, err := r.options.ESMetricsAPI.Bulk(strings.NewReader(bulkBodyBuilder.String())) + if err != nil { + return fmt.Errorf("error performing the bulk index request: %w", err) + } + bulkRes.Body.Close() + + if sr.ScrollID == "" { + return errors.New("error getting scroll ID") + } + + res, err = r.options.ESAPI.Scroll( + r.options.ESAPI.Scroll.WithScrollID(sr.ScrollID), + r.options.ESAPI.Scroll.WithScroll(time.Minute), + ) + if err != nil { + return fmt.Errorf("error executing scroll: %s", err) + } + res.Body.Close() + } + + logger.Debug("reindexing operation finished") + return nil +} + +type benchMeta struct { + Info struct { + Benchmark string `json:"benchmark"` + RunID string `json:"run_id"` + } `json:"info"` + Parameters scenario `json:"parameter"` +} + +func (r *runner) enrichEventWithBenchmarkMetadata(e map[string]interface{}) map[string]interface{} { + var m benchMeta + m.Info.Benchmark = r.options.BenchName + m.Info.RunID = r.ctxt.Test.RunID + m.Parameters = *r.scenario + e["benchmark_metadata"] = m + return e +} + func getTotalHits(esapi *elasticsearch.API, dataStream string) (int, error) { resp, err := esapi.Count( esapi.Count.WithIndex(dataStream), @@ -738,7 +916,7 @@ func filterAgents(allAgents []kibana.Agent) []kibana.Agent { switch { case agent.LocalMetadata.Host.Name == "docker-fleet-server", agent.PolicyID == "fleet-server-policy", - agent.PolicyID == "Elastic Cloud agent policy": + agent.PolicyID == "policy-elastic-agent-on-cloud": continue } filtered = append(filtered, agent) diff --git a/internal/benchrunner/runners/system/scenario.go b/internal/benchrunner/runners/system/scenario.go index be10101701..f0837d3e01 100644 --- a/internal/benchrunner/runners/system/scenario.go +++ b/internal/benchrunner/runners/system/scenario.go @@ -15,64 +15,68 @@ import ( "github.com/elastic/go-ucfg" "github.com/elastic/go-ucfg/yaml" - "github.com/elastic/elastic-package/internal/benchrunner/runners/system/servicedeployer" + "github.com/elastic/elastic-package/internal/servicedeployer" ) const devPath = "_dev/benchmark/system" type scenario struct { - Package string `config:"package"` - Description string `config:"description"` - Version string `config:"version"` - Input string `config:"input"` - Vars map[string]interface{} `config:"vars"` - DataStream dataStream `config:"data_stream"` - WarmupTimePeriod time.Duration `config:"warmup_time_period"` - BenchmarkTimePeriod time.Duration `config:"benchmark_time_period"` - WaitForDataTimeout time.Duration `config:"wait_for_data_timeout"` - Corpora corpora `config:"corpora"` + Package string `config:"package" json:"package"` + Description string `config:"description" json:"description"` + Version string `config:"version" json:"version"` + PolicyTemplate string `config:"policy_template" json:"policy_template"` + Input string `config:"input" json:"input"` + Vars map[string]interface{} `config:"vars" json:"vars"` + DataStream dataStream `config:"data_stream" json:"data_stream"` + WarmupTimePeriod time.Duration `config:"warmup_time_period" json:"warmup_time_period"` + BenchmarkTimePeriod time.Duration `config:"benchmark_time_period" json:"benchmark_time_period"` + WaitForDataTimeout *time.Duration `config:"wait_for_data_timeout" json:"wait_for_data_timeout"` + Corpora corpora `config:"corpora" json:"corpora"` } type dataStream struct { - Name string `config:"name"` - Vars map[string]interface{} `config:"vars"` + Name string `config:"name" json:"name"` + Vars map[string]interface{} `config:"vars" json:"vars"` } type corpora struct { - Generator *generator `config:"generator"` - InputService *inputService `config:"input_service"` + Generator *generator `config:"generator" json:"generator"` + InputService *inputService `config:"input_service" json:"input_service"` } type inputService struct { - Name string `config:"name"` - Signal string `config:"signal"` + Name string `config:"name" json:"name"` + Signal string `config:"signal" json:"signal"` } type generator struct { - Size string `config:"size"` - Template corporaTemplate `config:"template"` - Config corporaConfig `config:"config"` - Fields corporaFields `config:"fields"` + Size string `config:"size" json:"size"` + Template corporaTemplate `config:"template" json:"template"` + Config corporaConfig `config:"config" json:"config"` + Fields corporaFields `config:"fields" json:"fields"` } type corporaTemplate struct { - Raw string `config:"raw"` - Path string `config:"path"` - Type string `config:"type"` + Raw string `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` + Type string `config:"type" json:"type"` } type corporaConfig struct { - Raw map[string]interface{} `config:"raw"` - Path string `config:"path"` + Raw map[string]interface{} `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` } type corporaFields struct { - Raw map[string]interface{} `config:"raw"` - Path string `config:"path"` + Raw map[string]interface{} `config:"raw" json:"raw"` + Path string `config:"path" json:"path"` } func defaultConfig() *scenario { - return &scenario{} + timeout := 10 * time.Minute + return &scenario{ + WaitForDataTimeout: &timeout, + } } func readConfig(path, scenario string, ctxt servicedeployer.ServiceContext) (*scenario, error) { diff --git a/internal/benchrunner/runners/system/servicedeployer/compose.go b/internal/benchrunner/runners/system/servicedeployer/compose.go deleted file mode 100644 index 03c3ca7ea0..0000000000 --- a/internal/benchrunner/runners/system/servicedeployer/compose.go +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/elastic/elastic-package/internal/builder" - "github.com/elastic/elastic-package/internal/compose" - "github.com/elastic/elastic-package/internal/docker" - "github.com/elastic/elastic-package/internal/files" - "github.com/elastic/elastic-package/internal/logger" - "github.com/elastic/elastic-package/internal/profile" - "github.com/elastic/elastic-package/internal/stack" -) - -// DockerComposeServiceDeployer knows how to deploy a service defined via -// a Docker Compose file. -type DockerComposeServiceDeployer struct { - profile *profile.Profile - ymlPaths []string -} - -type dockerComposeDeployedService struct { - ctxt ServiceContext - - ymlPaths []string - project string -} - -// NewDockerComposeServiceDeployer returns a new instance of a DockerComposeServiceDeployer. -func NewDockerComposeServiceDeployer(profile *profile.Profile, ymlPaths []string) (*DockerComposeServiceDeployer, error) { - return &DockerComposeServiceDeployer{ - profile: profile, - ymlPaths: ymlPaths, - }, nil -} - -// SetUp sets up the service and returns any relevant information. -func (d *DockerComposeServiceDeployer) SetUp(inCtxt ServiceContext) (DeployedService, error) { - logger.Debug("setting up service using Docker Compose service deployer") - service := dockerComposeDeployedService{ - ymlPaths: d.ymlPaths, - project: "elastic-package-service", - } - outCtxt := inCtxt - - p, err := compose.NewProject(service.project, service.ymlPaths...) - if err != nil { - return nil, fmt.Errorf("could not create Docker Compose project for service: %w", err) - } - - // Verify the Elastic stack network - err = stack.EnsureStackNetworkUp(d.profile) - if err != nil { - return nil, fmt.Errorf("elastic stack network is not ready: %w", err) - } - - // Clean service logs - err = files.RemoveContent(outCtxt.Logs.Folder.Local) - if err != nil { - return nil, fmt.Errorf("removing service logs failed: %w", err) - } - - serviceName := inCtxt.Name - opts := compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, outCtxt.Logs.Folder.Local)}, - ExtraArgs: []string{"--build", "-d"}, - } - err = p.Up(opts) - if err != nil { - return nil, fmt.Errorf("could not boot up service using Docker Compose: %w", err) - } - err = p.WaitForHealthy(opts) - if err != nil { - processServiceContainerLogs(p, compose.CommandOptions{ - Env: opts.Env, - }, outCtxt.Name) - return nil, fmt.Errorf("service is unhealthy: %w", err) - } - - // Build service container name - outCtxt.Hostname = p.ContainerName(serviceName) - - // Connect service network with stack network (for the purpose of metrics collection) - err = docker.ConnectToNetwork(p.ContainerName(serviceName), stack.Network(d.profile)) - if err != nil { - return nil, fmt.Errorf("can't attach service container to the stack network: %w", err) - } - - logger.Debugf("adding service container %s internal ports to context", p.ContainerName(serviceName)) - serviceComposeConfig, err := p.Config(compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, outCtxt.Logs.Folder.Local)}, - }) - if err != nil { - return nil, fmt.Errorf("could not get Docker Compose configuration for service: %w", err) - } - - s := serviceComposeConfig.Services[serviceName] - outCtxt.Ports = make([]int, len(s.Ports)) - for idx, port := range s.Ports { - outCtxt.Ports[idx] = port.InternalPort - } - - // Shortcut to first port for convenience - if len(outCtxt.Ports) > 0 { - outCtxt.Port = outCtxt.Ports[0] - } - - outCtxt.Agent.Host.NamePrefix = "docker-fleet-agent" - service.ctxt = outCtxt - return &service, nil -} - -// Signal sends a signal to the service. -func (s *dockerComposeDeployedService) Signal(signal string) error { - p, err := compose.NewProject(s.project, s.ymlPaths...) - if err != nil { - return fmt.Errorf("could not create Docker Compose project for service: %w", err) - } - - opts := compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, s.ctxt.Logs.Folder.Local)}, - ExtraArgs: []string{"-s", signal}, - } - if s.ctxt.Name != "" { - opts.Services = append(opts.Services, s.ctxt.Name) - } - - if err := p.Kill(opts); err != nil { - return fmt.Errorf("could not send %q signal: %w", signal, err) - } - return nil -} - -// TearDown tears down the service. -func (s *dockerComposeDeployedService) TearDown() error { - logger.Debugf("tearing down service using Docker Compose runner") - defer func() { - err := files.RemoveContent(s.ctxt.Logs.Folder.Local) - if err != nil { - logger.Errorf("could not remove the service logs (path: %s)", s.ctxt.Logs.Folder.Local) - } - }() - - p, err := compose.NewProject(s.project, s.ymlPaths...) - if err != nil { - return fmt.Errorf("could not create Docker Compose project for service: %w", err) - } - - opts := compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, s.ctxt.Logs.Folder.Local)}, - } - processServiceContainerLogs(p, opts, s.ctxt.Name) - - if err := p.Down(compose.CommandOptions{ - Env: []string{fmt.Sprintf("%s=%s", ServiceLogsDirEnv, s.ctxt.Logs.Folder.Local)}, - ExtraArgs: []string{"--volumes"}, // Remove associated volumes. - }); err != nil { - return fmt.Errorf("could not shut down service using Docker Compose: %w", err) - } - return nil -} - -// Context returns the current context for the service. -func (s *dockerComposeDeployedService) Context() ServiceContext { - return s.ctxt -} - -// SetContext sets the current context for the service. -func (s *dockerComposeDeployedService) SetContext(ctxt ServiceContext) error { - s.ctxt = ctxt - return nil -} - -func processServiceContainerLogs(p *compose.Project, opts compose.CommandOptions, serviceName string) { - content, err := p.Logs(opts) - if err != nil { - logger.Errorf("can't export service logs: %v", err) - return - } - - if len(content) == 0 { - logger.Info("service container hasn't written anything logs.") - return - } - - err = writeServiceContainerLogs(serviceName, content) - if err != nil { - logger.Errorf("can't write service container logs: %v", err) - } -} - -func writeServiceContainerLogs(serviceName string, content []byte) error { - buildDir, err := builder.BuildDirectory() - if err != nil { - return fmt.Errorf("locating build directory failed: %w", err) - } - - containerLogsDir := filepath.Join(buildDir, "container-logs") - err = os.MkdirAll(containerLogsDir, 0755) - if err != nil { - return fmt.Errorf("can't create directory for service container logs (path: %s): %w", containerLogsDir, err) - } - - containerLogsFilepath := filepath.Join(containerLogsDir, fmt.Sprintf("%s-%d.log", serviceName, time.Now().UnixNano())) - logger.Infof("Write container logs to file: %s", containerLogsFilepath) - err = os.WriteFile(containerLogsFilepath, content, 0644) - if err != nil { - return fmt.Errorf("can't write container logs to file (path: %s): %w", containerLogsFilepath, err) - } - return nil -} diff --git a/internal/benchrunner/runners/system/servicedeployer/context.go b/internal/benchrunner/runners/system/servicedeployer/context.go deleted file mode 100644 index d9c0aabf5c..0000000000 --- a/internal/benchrunner/runners/system/servicedeployer/context.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -const ( - LocalCACertEnv = "LOCAL_CA_CERT" - ServiceLogsDirEnv = "SERVICE_LOGS_DIR" - BenchRunIDEnv = "BENCH_RUN_ID" -) - -// ServiceContext encapsulates context that is both available to a ServiceDeployer and -// populated by a DeployedService. The fields in ServiceContext may be used in handlebars -// templates in system benchmark configuration files, for example: {{ Hostname }}. -type ServiceContext struct { - // Name is the name of the service. - Name string - - // Hostname is the host name of the service, as addressable from - // the Agent container. - Hostname string - - // Ports is a list of ports that the service listens on, as addressable - // from the Agent container. - Ports []int - - // Port points to the first port in the list of ports. It's provided as - // a convenient shortcut as most services tend to listen on a single port. - Port int - - // Logs contains folder paths for log files produced by the service. - Logs struct { - Folder struct { - // Local contains the folder path where log files produced by - // the service are stored on the local filesystem, i.e. where - // elastic-package is running. - Local string - - // Agent contains the folder path where log files produced by - // the service are stored on the Agent container's filesystem. - Agent string - } - } - - // Bench related properties. - Bench struct { - // RunID identifies the current benchmark run. - RunID string - } - - // Agent related properties. - Agent struct { - // Host describes the machine which is running the agent. - Host struct { - // Name prefix for the host's name - NamePrefix string - } - } - - // CustomProperties store additional data used to boot up the service, e.g. AWS credentials. - CustomProperties map[string]interface{} -} - -// Aliases method returned aliases to properties of the service context. -func (sc *ServiceContext) Aliases() map[string]interface{} { - m := map[string]interface{}{ - ServiceLogsDirEnv: func() interface{} { - return sc.Logs.Folder.Agent - }, - BenchRunIDEnv: func() interface{} { - return sc.Bench.RunID - }, - } - - for k, v := range sc.CustomProperties { - var that = v - m[k] = func() interface{} { // wrap as function - return that - } - } - return m -} diff --git a/internal/benchrunner/runners/system/servicedeployer/factory.go b/internal/benchrunner/runners/system/servicedeployer/factory.go deleted file mode 100644 index c36e1db6ce..0000000000 --- a/internal/benchrunner/runners/system/servicedeployer/factory.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -import ( - "errors" - "fmt" - "os" - "path/filepath" - - "github.com/elastic/elastic-package/internal/profile" -) - -const devDeployDir = "_dev/benchmark/system/deploy" - -// FactoryOptions defines options used to create an instance of a service deployer. -type FactoryOptions struct { - Profile *profile.Profile - - RootPath string -} - -// Factory chooses the appropriate service runner for the given data stream, depending -// on service configuration files defined in the package or data stream. -func Factory(options FactoryOptions) (ServiceDeployer, error) { - devDeployPath, err := FindDevDeployPath(options) - if err != nil { - return nil, fmt.Errorf("can't find \"%s\" directory: %w", devDeployDir, err) - } - - serviceDeployerName, err := findServiceDeployer(devDeployPath) - if err != nil { - return nil, fmt.Errorf("can't find any valid service deployer: %w", err) - } - - serviceDeployerPath := filepath.Join(devDeployPath, serviceDeployerName) - - switch serviceDeployerName { - case "docker": - dockerComposeYMLPath := filepath.Join(serviceDeployerPath, "docker-compose.yml") - if _, err := os.Stat(dockerComposeYMLPath); err == nil { - return NewDockerComposeServiceDeployer(options.Profile, []string{dockerComposeYMLPath}) - } - } - return nil, fmt.Errorf("unsupported service deployer (name: %s)", serviceDeployerName) -} - -// FindDevDeployPath function returns a path reference to the "_dev/deploy" directory. -func FindDevDeployPath(options FactoryOptions) (string, error) { - path := filepath.Join(options.RootPath, devDeployDir) - if _, err := os.Stat(path); err == nil { - return path, nil - } else if !errors.Is(err, os.ErrNotExist) { - return "", fmt.Errorf("stat failed for path (path: %s): %w", path, err) - } - return "", fmt.Errorf("\"%s\" directory doesn't exist", devDeployDir) -} - -func findServiceDeployer(devDeployPath string) (string, error) { - fis, err := os.ReadDir(devDeployPath) - if err != nil { - return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployDir, err) - } - - var folders []os.DirEntry - for _, fi := range fis { - if fi.IsDir() { - folders = append(folders, fi) - } - } - - if len(folders) != 1 { - return "", fmt.Errorf("expected to find only one service deployer in \"%s\"", devDeployPath) - } - return folders[0].Name(), nil -} diff --git a/internal/elasticsearch/ingest/diskusage.go b/internal/elasticsearch/ingest/diskusage.go index b6be4f644d..7c8f334b1a 100644 --- a/internal/elasticsearch/ingest/diskusage.go +++ b/internal/elasticsearch/ingest/diskusage.go @@ -13,29 +13,20 @@ import ( ) type DiskUsageStat struct { - Total string `json:"total"` - TotalInBytes int `json:"total_in_bytes"` + TotalInBytes uint64 `json:"total_in_bytes"` InvertedIndex struct { - Total string `json:"total"` - TotalInBytes int `json:"total_in_bytes"` + TotalInBytes uint64 `json:"total_in_bytes"` } `json:"inverted_index"` - StoredFields string `json:"stored_fields"` - StoredFieldsInBytes int `json:"stored_fields_in_bytes"` - DocValues string `json:"doc_values"` - DocValuesInBytes int `json:"doc_values_in_bytes"` - Points string `json:"points"` - PointsInBytes int `json:"points_in_bytes"` - Norms string `json:"norms"` - NormsInBytes int `json:"norms_in_bytes"` - TermVectors string `json:"term_vectors"` - TermVectorsInBytes int `json:"term_vectors_in_bytes"` - KnnVectors string `json:"knn_vectors"` - KnnVectorsInBytes int `json:"knn_vectors_in_bytes"` + StoredFieldsInBytes uint64 `json:"stored_fields_in_bytes"` + DocValuesInBytes uint64 `json:"doc_values_in_bytes"` + PointsInBytes uint64 `json:"points_in_bytes"` + NormsInBytes uint64 `json:"norms_in_bytes"` + TermVectorsInBytes uint64 `json:"term_vectors_in_bytes"` + KnnVectorsInBytes uint64 `json:"knn_vectors_in_bytes"` } type DiskUsage struct { - StoreSize string `json:"store_size"` - StoreSizeInBytes int `json:"store_size_in_bytes"` + StoreSizeInBytes uint64 `json:"store_size_in_bytes"` AllFields DiskUsageStat `json:"all_fields"` Fields map[string]DiskUsageStat `json:"fields"` } diff --git a/internal/elasticsearch/ingest/nodestats.go b/internal/elasticsearch/ingest/nodestats.go index f827759aa7..4b9da276bf 100644 --- a/internal/elasticsearch/ingest/nodestats.go +++ b/internal/elasticsearch/ingest/nodestats.go @@ -199,9 +199,7 @@ type NodesStats struct { type NodeStats struct { Breakers map[string]struct { LimitSizeInBytes int `json:"limit_size_in_bytes"` - LimitSize string `json:"limit_size"` EstimatedSizeInBytes int `json:"estimated_size_in_bytes"` - EstimatedSize string `json:"estimated_size"` Overhead float64 `json:"overhead"` Tripped int `json:"tripped"` } diff --git a/internal/kibana/policies.go b/internal/kibana/policies.go index da0fd52ab5..8ed3c37029 100644 --- a/internal/kibana/policies.go +++ b/internal/kibana/policies.go @@ -227,13 +227,13 @@ type PackagePolicy struct { Name string `json:"name"` Version string `json:"version"` } `json:"package"` - Vars map[string]interface{} `json:"vars,omitempty"` Inputs map[string]PackagePolicyInput `json:"inputs,omitempty"` Force bool `json:"force"` } type PackagePolicyInput struct { Enabled bool `json:"enabled"` + Vars map[string]interface{} `json:"vars,omitempty"` Streams map[string]PackagePolicyStream `json:"streams,omitempty"` } diff --git a/internal/service/boot.go b/internal/service/boot.go index d268d09ccb..b71750d234 100644 --- a/internal/service/boot.go +++ b/internal/service/boot.go @@ -14,8 +14,8 @@ import ( "github.com/elastic/elastic-package/internal/profile" "github.com/elastic/elastic-package/internal/configuration/locations" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/testrunner/runners/system" - "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" ) // Options define the details of the service which should be booted up. @@ -24,6 +24,7 @@ type Options struct { ServiceName string PackageRootPath string + DevDeployDir string DataStreamRootPath string Variant string @@ -36,6 +37,7 @@ func BootUp(options Options) error { Profile: options.Profile, PackageRootPath: options.DataStreamRootPath, DataStreamRootPath: options.DataStreamRootPath, + DevDeployDir: options.DevDeployDir, Variant: options.Variant, }) if err != nil { diff --git a/internal/testrunner/runners/system/servicedeployer/_static/Dockerfile.terraform_deployer b/internal/servicedeployer/_static/Dockerfile.terraform_deployer similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/Dockerfile.terraform_deployer rename to internal/servicedeployer/_static/Dockerfile.terraform_deployer diff --git a/internal/testrunner/runners/system/servicedeployer/_static/docker-custom-agent-base.yml b/internal/servicedeployer/_static/docker-custom-agent-base.yml similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/docker-custom-agent-base.yml rename to internal/servicedeployer/_static/docker-custom-agent-base.yml diff --git a/internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer.yml b/internal/servicedeployer/_static/terraform_deployer.yml similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer.yml rename to internal/servicedeployer/_static/terraform_deployer.yml diff --git a/internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer_run.sh b/internal/servicedeployer/_static/terraform_deployer_run.sh similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/_static/terraform_deployer_run.sh rename to internal/servicedeployer/_static/terraform_deployer_run.sh diff --git a/internal/testrunner/runners/system/servicedeployer/compose.go b/internal/servicedeployer/compose.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/compose.go rename to internal/servicedeployer/compose.go diff --git a/internal/testrunner/runners/system/servicedeployer/context.go b/internal/servicedeployer/context.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/context.go rename to internal/servicedeployer/context.go diff --git a/internal/testrunner/runners/system/servicedeployer/custom_agent.go b/internal/servicedeployer/custom_agent.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/custom_agent.go rename to internal/servicedeployer/custom_agent.go diff --git a/internal/benchrunner/runners/system/servicedeployer/deployed_service.go b/internal/servicedeployer/deployed_service.go similarity index 100% rename from internal/benchrunner/runners/system/servicedeployer/deployed_service.go rename to internal/servicedeployer/deployed_service.go diff --git a/internal/testrunner/runners/system/servicedeployer/elastic-agent-managed.yaml.tmpl b/internal/servicedeployer/elastic-agent-managed.yaml.tmpl similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/elastic-agent-managed.yaml.tmpl rename to internal/servicedeployer/elastic-agent-managed.yaml.tmpl diff --git a/internal/testrunner/runners/system/servicedeployer/factory.go b/internal/servicedeployer/factory.go similarity index 85% rename from internal/testrunner/runners/system/servicedeployer/factory.go rename to internal/servicedeployer/factory.go index b459ecd352..00021017be 100644 --- a/internal/testrunner/runners/system/servicedeployer/factory.go +++ b/internal/servicedeployer/factory.go @@ -13,7 +13,10 @@ import ( "github.com/elastic/elastic-package/internal/profile" ) -const devDeployDir = "_dev/deploy" +const ( + TypeTest = "test" + TypeBench = "bench" +) // FactoryOptions defines options used to create an instance of a service deployer. type FactoryOptions struct { @@ -21,6 +24,8 @@ type FactoryOptions struct { PackageRootPath string DataStreamRootPath string + DevDeployDir string + Type string Variant string } @@ -30,7 +35,7 @@ type FactoryOptions struct { func Factory(options FactoryOptions) (ServiceDeployer, error) { devDeployPath, err := FindDevDeployPath(options) if err != nil { - return nil, fmt.Errorf("can't find \"%s\" directory: %w", devDeployDir, err) + return nil, fmt.Errorf("can't find \"%s\" directory: %w", options.DevDeployDir, err) } serviceDeployerName, err := findServiceDeployer(devDeployPath) @@ -55,6 +60,9 @@ func Factory(options FactoryOptions) (ServiceDeployer, error) { return NewDockerComposeServiceDeployer(options.Profile, []string{dockerComposeYMLPath}, sv) } case "agent": + if options.Type != TypeTest { + return nil, fmt.Errorf("agent deployer is not supported for type %s", options.Type) + } customAgentCfgYMLPath := filepath.Join(serviceDeployerPath, "custom-agent.yml") if _, err := os.Stat(customAgentCfgYMLPath); err != nil { return nil, fmt.Errorf("can't find expected file custom-agent.yml: %w", err) @@ -71,28 +79,27 @@ func Factory(options FactoryOptions) (ServiceDeployer, error) { // FindDevDeployPath function returns a path reference to the "_dev/deploy" directory. func FindDevDeployPath(options FactoryOptions) (string, error) { - dataStreamDevDeployPath := filepath.Join(options.DataStreamRootPath, devDeployDir) - _, err := os.Stat(dataStreamDevDeployPath) - if err == nil { + dataStreamDevDeployPath := filepath.Join(options.DataStreamRootPath, options.DevDeployDir) + if _, err := os.Stat(dataStreamDevDeployPath); err == nil { return dataStreamDevDeployPath, nil } else if !errors.Is(err, os.ErrNotExist) { return "", fmt.Errorf("stat failed for data stream (path: %s): %w", dataStreamDevDeployPath, err) } - packageDevDeployPath := filepath.Join(options.PackageRootPath, devDeployDir) - _, err = os.Stat(packageDevDeployPath) - if err == nil { + packageDevDeployPath := filepath.Join(options.PackageRootPath, options.DevDeployDir) + if _, err := os.Stat(packageDevDeployPath); err == nil { return packageDevDeployPath, nil } else if !errors.Is(err, os.ErrNotExist) { return "", fmt.Errorf("stat failed for package (path: %s): %w", packageDevDeployPath, err) } - return "", fmt.Errorf("\"%s\" directory doesn't exist", devDeployDir) + + return "", fmt.Errorf("\"%s\" directory doesn't exist", options.DevDeployDir) } func findServiceDeployer(devDeployPath string) (string, error) { fis, err := os.ReadDir(devDeployPath) if err != nil { - return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployDir, err) + return "", fmt.Errorf("can't read directory (path: %s): %w", devDeployPath, err) } var folders []os.DirEntry diff --git a/internal/testrunner/runners/system/servicedeployer/kubernetes.go b/internal/servicedeployer/kubernetes.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/kubernetes.go rename to internal/servicedeployer/kubernetes.go diff --git a/internal/benchrunner/runners/system/servicedeployer/service_deployer.go b/internal/servicedeployer/service_deployer.go similarity index 100% rename from internal/benchrunner/runners/system/servicedeployer/service_deployer.go rename to internal/servicedeployer/service_deployer.go diff --git a/internal/testrunner/runners/system/servicedeployer/terraform.go b/internal/servicedeployer/terraform.go similarity index 95% rename from internal/testrunner/runners/system/servicedeployer/terraform.go rename to internal/servicedeployer/terraform.go index 1a61125224..8bbb0c65d4 100644 --- a/internal/testrunner/runners/system/servicedeployer/terraform.go +++ b/internal/servicedeployer/terraform.go @@ -210,4 +210,12 @@ func (tsd TerraformServiceDeployer) installDockerfile() (string, error) { return tfDir, nil } +func CreateOutputDir(locationManager *locations.LocationManager, runId string) (string, error) { + outputDir := filepath.Join(locationManager.ServiceOutputDir(), runId) + if err := os.MkdirAll(outputDir, 0755); err != nil { + return "", fmt.Errorf("failed to create output directory: %w", err) + } + return outputDir, nil +} + var _ ServiceDeployer = new(TerraformServiceDeployer) diff --git a/internal/testrunner/runners/system/servicedeployer/terraform_env.go b/internal/servicedeployer/terraform_env.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/terraform_env.go rename to internal/servicedeployer/terraform_env.go diff --git a/internal/testrunner/runners/system/servicedeployer/terraform_test.go b/internal/servicedeployer/terraform_test.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/terraform_test.go rename to internal/servicedeployer/terraform_test.go diff --git a/internal/testrunner/runners/system/servicedeployer/variants.go b/internal/servicedeployer/variants.go similarity index 100% rename from internal/testrunner/runners/system/servicedeployer/variants.go rename to internal/servicedeployer/variants.go diff --git a/internal/testrunner/runners/system/runner.go b/internal/testrunner/runners/system/runner.go index 915f530f0e..de99c6aa6b 100644 --- a/internal/testrunner/runners/system/runner.go +++ b/internal/testrunner/runners/system/runner.go @@ -27,10 +27,10 @@ import ( "github.com/elastic/elastic-package/internal/multierror" "github.com/elastic/elastic-package/internal/packages" "github.com/elastic/elastic-package/internal/packages/installer" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/signal" "github.com/elastic/elastic-package/internal/stack" "github.com/elastic/elastic-package/internal/testrunner" - "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" ) const ( @@ -38,6 +38,7 @@ const ( testRunMinID = 10000 allFieldsBody = `{"fields": ["*"]}` + DevDeployDir = "_dev/deploy" ) func init() { @@ -204,6 +205,7 @@ func (r *runner) run() (results []testrunner.TestResult, err error) { Profile: r.options.Profile, PackageRootPath: r.options.PackageRootPath, DataStreamRootPath: dataStreamPath, + DevDeployDir: DevDeployDir, }) if err != nil { return result.WithError(fmt.Errorf("_dev/deploy directory not found: %w", err)) @@ -256,7 +258,9 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa Profile: r.options.Profile, PackageRootPath: r.options.PackageRootPath, DataStreamRootPath: dataStreamPath, + DevDeployDir: DevDeployDir, Variant: variantName, + Type: servicedeployer.TypeTest, } var ctxt servicedeployer.ServiceContext @@ -265,7 +269,7 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa ctxt.Logs.Folder.Agent = ServiceLogsAgentDir ctxt.Test.RunID = createTestRunID() - outputDir, err := createOutputDir(locationManager, ctxt.Test.RunID) + outputDir, err := servicedeployer.CreateOutputDir(locationManager, ctxt.Test.RunID) if err != nil { return nil, fmt.Errorf("could not create output dir for terraform deployer %w", err) } @@ -298,14 +302,6 @@ func (r *runner) runTestPerVariant(result *testrunner.ResultComposer, locationMa return partial, nil } -func createOutputDir(locationManager *locations.LocationManager, runId string) (string, error) { - outputDir := filepath.Join(locationManager.ServiceOutputDir(), runId) - if err := os.MkdirAll(outputDir, 0755); err != nil { - return "", fmt.Errorf("failed to create output directory: %w", err) - } - return outputDir, nil -} - func createTestRunID() string { return fmt.Sprintf("%d", rand.Intn(testRunMaxID-testRunMinID)+testRunMinID) } diff --git a/internal/testrunner/runners/system/servicedeployer/deployed_service.go b/internal/testrunner/runners/system/servicedeployer/deployed_service.go deleted file mode 100644 index ebd1a87f9e..0000000000 --- a/internal/testrunner/runners/system/servicedeployer/deployed_service.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -// DeployedService defines the interface for interacting with a service that has been deployed. -type DeployedService interface { - // TearDown implements the logic for tearing down a service. - TearDown() error - - // Signal sends a signal to the service. - Signal(signal string) error - - // Context returns the current context from the service. - Context() ServiceContext - - // SetContext sets the current context for the service. - SetContext(str ServiceContext) error -} diff --git a/internal/testrunner/runners/system/servicedeployer/service_deployer.go b/internal/testrunner/runners/system/servicedeployer/service_deployer.go deleted file mode 100644 index 5e1cb93afe..0000000000 --- a/internal/testrunner/runners/system/servicedeployer/service_deployer.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package servicedeployer - -// ServiceDeployer defines the interface for deploying a service. It defines methods for -// controlling the lifecycle of a service. -type ServiceDeployer interface { - // SetUp implements the logic for setting up a service. It takes a context and returns a - // ServiceHandler. - SetUp(ctxt ServiceContext) (DeployedService, error) -} diff --git a/internal/testrunner/runners/system/test_config.go b/internal/testrunner/runners/system/test_config.go index 63c374ba3a..6a55f9d27d 100644 --- a/internal/testrunner/runners/system/test_config.go +++ b/internal/testrunner/runners/system/test_config.go @@ -19,8 +19,8 @@ import ( "github.com/elastic/go-ucfg/yaml" "github.com/elastic/elastic-package/internal/common" + "github.com/elastic/elastic-package/internal/servicedeployer" "github.com/elastic/elastic-package/internal/testrunner" - "github.com/elastic/elastic-package/internal/testrunner/runners/system/servicedeployer" ) var systemTestConfigFilePattern = regexp.MustCompile(`^test-([a-z0-9_.-]+)-config.yml$`) diff --git a/scripts/test-check-packages.sh b/scripts/test-check-packages.sh index 314c005204..c62c8593df 100755 --- a/scripts/test-check-packages.sh +++ b/scripts/test-check-packages.sh @@ -85,10 +85,9 @@ for d in test/packages/${PACKAGE_TEST_TYPE:-other}/${PACKAGE_UNDER_TEST:-*}/; do --old ${OLDPWD}/build/benchmark-results-old \ --threshold 1 --report-output-path="${OLDPWD}/build/benchreport" fi - # FIXME: running system benchmark in package "system_benchmark" fails with panic - # if [ "${package_to_test}" == "system_benchmark" ]; then - # elastic-package benchmark system --benchmark logs-benchmark -v --defer-cleanup 1s - # fi + if [ "${package_to_test}" == "system_benchmark" ]; then + elastic-package benchmark system --benchmark logs-benchmark -v --defer-cleanup 1s + fi else # defer-cleanup is set to a short period to verify that the option is available elastic-package test -v --report-format xUnit --report-output file --defer-cleanup 1s --test-coverage diff --git a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml index 1673942093..56d25b1571 100644 --- a/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml +++ b/test/packages/benchmarks/system_benchmark/_dev/benchmark/system/logs-benchmark.yml @@ -6,6 +6,7 @@ data_stream.name: testds data_stream.vars.paths: - "{{SERVICE_LOGS_DIR}}/corpus-*" warmup_time_period: 10s +wait_for_data_timeout: 10m corpora.generator.size: 20MiB corpora.generator.template.path: ./logs-benchmark/template.log corpora.generator.config.path: ./logs-benchmark/config.yml