From a01fcb60a14676fb7e76c360ff607abd4da97804 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 17 Nov 2023 10:14:51 +0800 Subject: [PATCH] Remove wait_for_integration precondition We'll switch to setting require_data_stream when it's available: https://github.com/elastic/elasticsearch/pull/101872 --- internal/beater/beater.go | 12 -- internal/beater/config/config_test.go | 9 +- internal/beater/config/data_streams.go | 13 +- internal/beater/server_test.go | 209 ------------------------- 4 files changed, 4 insertions(+), 239 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index f6c7f5e4d53..0d1a71f4501 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -639,18 +639,6 @@ func (s *Runner) waitReady( }) } - // When running standalone with data streams enabled, by default we will add - // a precondition that ensures the integration is installed. - fleetManaged := s.fleetConfig != nil - if !fleetManaged && s.config.DataStreams.WaitForIntegration { - if kibanaClient == nil && esOutputClient == nil { - return errors.New("cannot wait for integration without either Kibana or Elasticsearch config") - } - preconditions = append(preconditions, func(ctx context.Context) error { - return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger) - }) - } - if len(preconditions) == 0 { return nil } diff --git a/internal/beater/config/config_test.go b/internal/beater/config/config_test.go index 71b4fd03a05..7fb5cc87a5d 100644 --- a/internal/beater/config/config_test.go +++ b/internal/beater/config/config_test.go @@ -364,8 +364,7 @@ func TestUnpackConfig(t *testing.T) { }, DefaultServiceEnvironment: "overridden", DataStreams: DataStreamsConfig{ - Namespace: "default", - WaitForIntegration: true, + Namespace: "default", }, WaitReadyInterval: 5 * time.Second, }, @@ -413,8 +412,7 @@ func TestUnpackConfig(t *testing.T) { "storage_limit": "1GB", }, "data_streams": map[string]interface{}{ - "namespace": "foo", - "wait_for_integration": false, + "namespace": "foo", }, }, outCfg: &Config{ @@ -498,8 +496,7 @@ func TestUnpackConfig(t *testing.T) { }, }, DataStreams: DataStreamsConfig{ - Namespace: "foo", - WaitForIntegration: false, + Namespace: "foo", }, WaitReadyInterval: 5 * time.Second, }, diff --git a/internal/beater/config/data_streams.go b/internal/beater/config/data_streams.go index 0a0456eaa4c..0dbc6d53267 100644 --- a/internal/beater/config/data_streams.go +++ b/internal/beater/config/data_streams.go @@ -20,21 +20,10 @@ package config // DataStreamsConfig holds data streams configuration. type DataStreamsConfig struct { Namespace string `config:"namespace"` - - // WaitForIntegration controls whether APM Server waits for the Fleet - // integration package to be installed before indexing events. - // - // This config is ignored when running under Elastic Agent; it is intended - // for running APM Server standalone, relying on Fleet to install the integration - // for creating Elasticsearch index templates, ILM policies, and ingest pipelines. - // - // This configuration requires either a connection to Kibana or Elasticsearch. - WaitForIntegration bool `config:"wait_for_integration"` } func defaultDataStreamsConfig() DataStreamsConfig { return DataStreamsConfig{ - Namespace: "default", - WaitForIntegration: true, + Namespace: "default", } } diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go index 1249a41c4af..d62bb3d892b 100644 --- a/internal/beater/server_test.go +++ b/internal/beater/server_test.go @@ -28,13 +28,10 @@ import ( "net/http/httptest" "net/url" "os" - "path" "path/filepath" "reflect" "runtime" "strings" - "sync" - "sync/atomic" "testing" "time" @@ -330,212 +327,6 @@ func TestServerOTLPGRPC(t *testing.T) { assert.NoError(t, err) } -func TestServerWaitForIntegrationKibana(t *testing.T) { - var requests int64 - requestCh := make(chan struct{}) - mux := http.NewServeMux() - mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`{"version":{"number":"1.2.3"}}`)) - }) - mux.HandleFunc("/api/fleet/epm/packages/apm", func(w http.ResponseWriter, r *http.Request) { - switch atomic.AddInt64(&requests, 1) { - case 1: - w.WriteHeader(500) - case 2: - fmt.Fprintln(w, `{"response":{"status":"not_installed"}}`) - case 3: - fmt.Fprintln(w, `{"response":{"status":"installed"}}`) - } - select { - case requestCh <- struct{}{}: - case <-r.Context().Done(): - } - }) - kibanaServer := httptest.NewServer(mux) - defer kibanaServer.Close() - - srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "wait_ready_interval": "100ms", - "kibana.enabled": true, - "kibana.host": kibanaServer.URL, - "data_streams.wait_for_integration": true, - }, - }))) - - timeout := time.After(10 * time.Second) - for i := 0; i < 3; i++ { - select { - case <-requestCh: - case <-timeout: - t.Fatal("timed out waiting for request") - } - } - - // TODO(axw) there _should_ be just 2 logs, but there might be an initial - // log message due to the Kibana client connecting asynchronously. We should - // update internal/kibana to remove the async behaviour. - logs := srv.Logs.FilterMessageSnippet("please install the apm integration") - assert.NotZero(t, logs.Len()) - - select { - case <-requestCh: - t.Fatal("unexpected request") - case <-time.After(50 * time.Millisecond): - } -} - -func TestServerWaitForIntegrationElasticsearch(t *testing.T) { - var mu sync.Mutex - var tracesRequests int - tracesRequestsCh := make(chan int) - bulkCh := make(chan struct{}, 1) - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("X-Elastic-Product", "Elasticsearch") - // We must send a valid JSON response for the libbeat - // elasticsearch client to send bulk requests. - fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) - }) - mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) { - mu.Lock() - defer mu.Unlock() - template := path.Base(r.URL.Path) - if template == "traces-apm" { - tracesRequests++ - if tracesRequests == 1 { - w.WriteHeader(404) - } - tracesRequestsCh <- tracesRequests - } - }) - mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { - select { - case bulkCh <- struct{}{}: - default: - } - }) - elasticsearchServer := httptest.NewServer(mux) - defer elasticsearchServer.Close() - - srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "wait_ready_interval": "100ms", - "data_streams.wait_for_integration": true, - }, - "output.elasticsearch": map[string]interface{}{ - "hosts": []string{elasticsearchServer.URL}, - "backoff": map[string]interface{}{"init": "10ms", "max": "10ms"}, - "max_retries": 1000, - }, - }))) - - // Send some events to the server. They should be accepted and enqueued. - req := makeTransactionRequest(t, srv.URL) - req.Header.Add("Content-Type", "application/x-ndjson") - resp, err := srv.Client.Do(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusAccepted, resp.StatusCode) - resp.Body.Close() - - // Healthcheck should report that the server is not publish-ready. - resp, err = srv.Client.Get(srv.URL + api.RootPath) - require.NoError(t, err) - out := decodeJSONMap(t, resp.Body) - resp.Body.Close() - assert.Equal(t, false, out["publish_ready"]) - - // Indexing should be blocked until we receive from tracesRequestsCh. - select { - case <-bulkCh: - t.Fatal("unexpected bulk request") - case <-time.After(50 * time.Millisecond): - } - - timeout := time.After(10 * time.Second) - var done bool - for !done { - select { - case n := <-tracesRequestsCh: - done = n == 2 - case <-timeout: - t.Fatal("timed out waiting for request") - } - } - - // libbeat should keep retrying, and finally succeed now it is unblocked. - select { - case <-bulkCh: - case <-time.After(10 * time.Second): - t.Fatal("timed out waiting for bulk request") - } - - logs := srv.Logs.FilterMessageSnippet("please install the apm integration") - assert.Len(t, logs.All(), 1, "couldn't find remediation message logs") - - // Healthcheck should now report that the server is publish-ready. - resp, err = srv.Client.Get(srv.URL + api.RootPath) - require.NoError(t, err) - out = decodeJSONMap(t, resp.Body) - resp.Body.Close() - assert.Equal(t, true, out["publish_ready"]) -} - -func TestServerFailedPreconditionDoesNotIndex(t *testing.T) { - bulkCh := make(chan struct{}, 1) - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("X-Elastic-Product", "Elasticsearch") - // We must send a valid JSON response for the libbeat - // elasticsearch client to send bulk requests. - fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) - }) - mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(404) - }) - mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { - select { - case bulkCh <- struct{}{}: - default: - } - }) - elasticsearchServer := httptest.NewServer(mux) - defer elasticsearchServer.Close() - - srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{ - "apm-server": map[string]interface{}{ - "wait_ready_interval": "100ms", - "data_streams.wait_for_integration": true, - }, - "output.elasticsearch.hosts": []string{elasticsearchServer.URL}, - }))) - - // Send some events to the server. They should be accepted and enqueued. - req := makeTransactionRequest(t, srv.URL) - req.Header.Add("Content-Type", "application/x-ndjson") - resp, err := srv.Client.Do(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusAccepted, resp.StatusCode) - resp.Body.Close() - - // Healthcheck should report that the server is not publish-ready. - resp, err = srv.Client.Get(srv.URL + api.RootPath) - require.NoError(t, err) - out := decodeJSONMap(t, resp.Body) - resp.Body.Close() - assert.Equal(t, false, out["publish_ready"]) - - // Stop the server. - srv.Close() - - // No documents should be indexed. - select { - case <-bulkCh: - t.Fatal("unexpected bulk request") - case <-time.After(50 * time.Millisecond): - } -} - func TestTailSamplingPlatinumLicense(t *testing.T) { bulkCh := make(chan struct{}, 1) licenseReq := make(chan struct{})