Skip to content

Commit

Permalink
Remove wait_for_integration precondition
Browse files Browse the repository at this point in the history
We'll switch to setting require_data_stream when
it's available:
elastic/elasticsearch#101872
  • Loading branch information
axw committed Nov 17, 2023
1 parent 79a0c70 commit a01fcb6
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 239 deletions.
12 changes: 0 additions & 12 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,18 +639,6 @@ func (s *Runner) waitReady(
})
}

// When running standalone with data streams enabled, by default we will add
// a precondition that ensures the integration is installed.
fleetManaged := s.fleetConfig != nil
if !fleetManaged && s.config.DataStreams.WaitForIntegration {
if kibanaClient == nil && esOutputClient == nil {
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
preconditions = append(preconditions, func(ctx context.Context) error {
return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger)
})
}

if len(preconditions) == 0 {
return nil
}
Expand Down
9 changes: 3 additions & 6 deletions internal/beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,7 @@ func TestUnpackConfig(t *testing.T) {
},
DefaultServiceEnvironment: "overridden",
DataStreams: DataStreamsConfig{
Namespace: "default",
WaitForIntegration: true,
Namespace: "default",
},
WaitReadyInterval: 5 * time.Second,
},
Expand Down Expand Up @@ -413,8 +412,7 @@ func TestUnpackConfig(t *testing.T) {
"storage_limit": "1GB",
},
"data_streams": map[string]interface{}{
"namespace": "foo",
"wait_for_integration": false,
"namespace": "foo",
},
},
outCfg: &Config{
Expand Down Expand Up @@ -498,8 +496,7 @@ func TestUnpackConfig(t *testing.T) {
},
},
DataStreams: DataStreamsConfig{
Namespace: "foo",
WaitForIntegration: false,
Namespace: "foo",
},
WaitReadyInterval: 5 * time.Second,
},
Expand Down
13 changes: 1 addition & 12 deletions internal/beater/config/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,10 @@ package config
// DataStreamsConfig holds data streams configuration.
type DataStreamsConfig struct {
Namespace string `config:"namespace"`

// WaitForIntegration controls whether APM Server waits for the Fleet
// integration package to be installed before indexing events.
//
// This config is ignored when running under Elastic Agent; it is intended
// for running APM Server standalone, relying on Fleet to install the integration
// for creating Elasticsearch index templates, ILM policies, and ingest pipelines.
//
// This configuration requires either a connection to Kibana or Elasticsearch.
WaitForIntegration bool `config:"wait_for_integration"`
}

func defaultDataStreamsConfig() DataStreamsConfig {
return DataStreamsConfig{
Namespace: "default",
WaitForIntegration: true,
Namespace: "default",
}
}
209 changes: 0 additions & 209 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -330,212 +327,6 @@ func TestServerOTLPGRPC(t *testing.T) {
assert.NoError(t, err)
}

func TestServerWaitForIntegrationKibana(t *testing.T) {
var requests int64
requestCh := make(chan struct{})
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
})
mux.HandleFunc("/api/fleet/epm/packages/apm", func(w http.ResponseWriter, r *http.Request) {
switch atomic.AddInt64(&requests, 1) {
case 1:
w.WriteHeader(500)
case 2:
fmt.Fprintln(w, `{"response":{"status":"not_installed"}}`)
case 3:
fmt.Fprintln(w, `{"response":{"status":"installed"}}`)
}
select {
case requestCh <- struct{}{}:
case <-r.Context().Done():
}
})
kibanaServer := httptest.NewServer(mux)
defer kibanaServer.Close()

srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{
"apm-server": map[string]interface{}{
"wait_ready_interval": "100ms",
"kibana.enabled": true,
"kibana.host": kibanaServer.URL,
"data_streams.wait_for_integration": true,
},
})))

timeout := time.After(10 * time.Second)
for i := 0; i < 3; i++ {
select {
case <-requestCh:
case <-timeout:
t.Fatal("timed out waiting for request")
}
}

// TODO(axw) there _should_ be just 2 logs, but there might be an initial
// log message due to the Kibana client connecting asynchronously. We should
// update internal/kibana to remove the async behaviour.
logs := srv.Logs.FilterMessageSnippet("please install the apm integration")
assert.NotZero(t, logs.Len())

select {
case <-requestCh:
t.Fatal("unexpected request")
case <-time.After(50 * time.Millisecond):
}
}

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
var tracesRequests int
tracesRequestsCh := make(chan int)
bulkCh := make(chan struct{}, 1)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
// We must send a valid JSON response for the libbeat
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
template := path.Base(r.URL.Path)
if template == "traces-apm" {
tracesRequests++
if tracesRequests == 1 {
w.WriteHeader(404)
}
tracesRequestsCh <- tracesRequests
}
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
select {
case bulkCh <- struct{}{}:
default:
}
})
elasticsearchServer := httptest.NewServer(mux)
defer elasticsearchServer.Close()

srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{
"apm-server": map[string]interface{}{
"wait_ready_interval": "100ms",
"data_streams.wait_for_integration": true,
},
"output.elasticsearch": map[string]interface{}{
"hosts": []string{elasticsearchServer.URL},
"backoff": map[string]interface{}{"init": "10ms", "max": "10ms"},
"max_retries": 1000,
},
})))

// Send some events to the server. They should be accepted and enqueued.
req := makeTransactionRequest(t, srv.URL)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := srv.Client.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
resp.Body.Close()

// Healthcheck should report that the server is not publish-ready.
resp, err = srv.Client.Get(srv.URL + api.RootPath)
require.NoError(t, err)
out := decodeJSONMap(t, resp.Body)
resp.Body.Close()
assert.Equal(t, false, out["publish_ready"])

// Indexing should be blocked until we receive from tracesRequestsCh.
select {
case <-bulkCh:
t.Fatal("unexpected bulk request")
case <-time.After(50 * time.Millisecond):
}

timeout := time.After(10 * time.Second)
var done bool
for !done {
select {
case n := <-tracesRequestsCh:
done = n == 2
case <-timeout:
t.Fatal("timed out waiting for request")
}
}

// libbeat should keep retrying, and finally succeed now it is unblocked.
select {
case <-bulkCh:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for bulk request")
}

logs := srv.Logs.FilterMessageSnippet("please install the apm integration")
assert.Len(t, logs.All(), 1, "couldn't find remediation message logs")

// Healthcheck should now report that the server is publish-ready.
resp, err = srv.Client.Get(srv.URL + api.RootPath)
require.NoError(t, err)
out = decodeJSONMap(t, resp.Body)
resp.Body.Close()
assert.Equal(t, true, out["publish_ready"])
}

func TestServerFailedPreconditionDoesNotIndex(t *testing.T) {
bulkCh := make(chan struct{}, 1)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
// We must send a valid JSON response for the libbeat
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
select {
case bulkCh <- struct{}{}:
default:
}
})
elasticsearchServer := httptest.NewServer(mux)
defer elasticsearchServer.Close()

srv := beatertest.NewServer(t, beatertest.WithConfig(agentconfig.MustNewConfigFrom(map[string]interface{}{
"apm-server": map[string]interface{}{
"wait_ready_interval": "100ms",
"data_streams.wait_for_integration": true,
},
"output.elasticsearch.hosts": []string{elasticsearchServer.URL},
})))

// Send some events to the server. They should be accepted and enqueued.
req := makeTransactionRequest(t, srv.URL)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := srv.Client.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
resp.Body.Close()

// Healthcheck should report that the server is not publish-ready.
resp, err = srv.Client.Get(srv.URL + api.RootPath)
require.NoError(t, err)
out := decodeJSONMap(t, resp.Body)
resp.Body.Close()
assert.Equal(t, false, out["publish_ready"])

// Stop the server.
srv.Close()

// No documents should be indexed.
select {
case <-bulkCh:
t.Fatal("unexpected bulk request")
case <-time.After(50 * time.Millisecond):
}
}

func TestTailSamplingPlatinumLicense(t *testing.T) {
bulkCh := make(chan struct{}, 1)
licenseReq := make(chan struct{})
Expand Down

0 comments on commit a01fcb6

Please sign in to comment.