diff --git a/config/crds/v1/all-crds.yaml b/config/crds/v1/all-crds.yaml index 712e681404..b48bf6db56 100644 --- a/config/crds/v1/all-crds.yaml +++ b/config/crds/v1/all-crds.yaml @@ -1462,6 +1462,15 @@ spec: description: KibanaAssociationStatus is the status of any auto-linking to Kibana. type: string + observedGeneration: + description: ObservedGeneration represents the .metadata.generation + that the status is based upon. It corresponds to the metadata generation, + which is updated on mutation by the API Server. If the generation + observed in status diverges from the generation in metadata, the + APM Server controller has not yet processed the changes contained + in the APM Server specification. + format: int64 + type: integer secretTokenSecret: description: SecretTokenSecretName is the name of the Secret that contains the secret token diff --git a/config/crds/v1/bases/apm.k8s.elastic.co_apmservers.yaml b/config/crds/v1/bases/apm.k8s.elastic.co_apmservers.yaml index 04bfb1886f..6521222e4c 100644 --- a/config/crds/v1/bases/apm.k8s.elastic.co_apmservers.yaml +++ b/config/crds/v1/bases/apm.k8s.elastic.co_apmservers.yaml @@ -7686,6 +7686,15 @@ spec: description: KibanaAssociationStatus is the status of any auto-linking to Kibana. type: string + observedGeneration: + description: ObservedGeneration represents the .metadata.generation + that the status is based upon. It corresponds to the metadata generation, + which is updated on mutation by the API Server. If the generation + observed in status diverges from the generation in metadata, the + APM Server controller has not yet processed the changes contained + in the APM Server specification. + format: int64 + type: integer secretTokenSecret: description: SecretTokenSecretName is the name of the Secret that contains the secret token diff --git a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml index bbf2c9bdae..58448226d3 100644 --- a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml +++ b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml @@ -1474,6 +1474,15 @@ spec: description: KibanaAssociationStatus is the status of any auto-linking to Kibana. type: string + observedGeneration: + description: ObservedGeneration represents the .metadata.generation + that the status is based upon. It corresponds to the metadata generation, + which is updated on mutation by the API Server. If the generation + observed in status diverges from the generation in metadata, the + APM Server controller has not yet processed the changes contained + in the APM Server specification. + format: int64 + type: integer secretTokenSecret: description: SecretTokenSecretName is the name of the Secret that contains the secret token diff --git a/pkg/apis/apm/v1/apmserver_types.go b/pkg/apis/apm/v1/apmserver_types.go index b95d893c60..bf281258e3 100644 --- a/pkg/apis/apm/v1/apmserver_types.go +++ b/pkg/apis/apm/v1/apmserver_types.go @@ -63,14 +63,25 @@ type ApmServerSpec struct { // ApmServerStatus defines the observed state of ApmServer type ApmServerStatus struct { commonv1.DeploymentStatus `json:",inline"` + // ExternalService is the name of the service the agents should connect to. ExternalService string `json:"service,omitempty"` + // SecretTokenSecretName is the name of the Secret that contains the secret token SecretTokenSecretName string `json:"secretTokenSecret,omitempty"` + // ElasticsearchAssociationStatus is the status of any auto-linking to Elasticsearch clusters. ElasticsearchAssociationStatus commonv1.AssociationStatus `json:"elasticsearchAssociationStatus,omitempty"` + // KibanaAssociationStatus is the status of any auto-linking to Kibana. KibanaAssociationStatus commonv1.AssociationStatus `json:"kibanaAssociationStatus,omitempty"` + + // ObservedGeneration represents the .metadata.generation that the status is based upon. + // It corresponds to the metadata generation, which is updated on mutation by the API Server. + // If the generation observed in status diverges from the generation in metadata, the APM Server + // controller has not yet processed the changes contained in the APM Server specification. + // +kubebuilder:validation:Optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +kubebuilder:object:root=true @@ -186,6 +197,11 @@ func (as *ApmServer) SetAssociationStatusMap(typ commonv1.AssociationType, statu } } +// GetObservedGeneration will return the observedGeneration from the Elastic APM Server's status. +func (as *ApmServer) GetObservedGeneration() int64 { + return as.Status.ObservedGeneration +} + // ApmEsAssociation helps to manage the APMServer / Elasticsearch association type ApmEsAssociation struct { *ApmServer diff --git a/pkg/controller/apmserver/controller.go b/pkg/controller/apmserver/controller.go index 02230774e7..a2bd40757a 100644 --- a/pkg/controller/apmserver/controller.go +++ b/pkg/controller/apmserver/controller.go @@ -88,9 +88,8 @@ func Add(mgr manager.Manager, params operator.Parameters) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileApmServer { - client := mgr.GetClient() return &ReconcileApmServer{ - Client: client, + Client: mgr.GetClient(), recorder: mgr.GetEventRecorderFor(controllerName), dynamicWatches: watches.NewDynamicWatches(), Parameters: params, @@ -153,14 +152,18 @@ type ReconcileApmServer struct { iteration uint64 } +// K8sClient returns the kubernetes client from the APM Server reconciler. func (r *ReconcileApmServer) K8sClient() k8s.Client { return r.Client } +// DynamicWatches returns the set of dynamic watches from the APM Server reconciler. func (r *ReconcileApmServer) DynamicWatches() watches.DynamicWatches { return r.dynamicWatches } +// Recorder returns the Kubernetes recorder that is responsible for recording and reporting +// events from the APM Server reconciler. func (r *ReconcileApmServer) Recorder() record.EventRecorder { return r.recorder } @@ -200,30 +203,34 @@ func (r *ReconcileApmServer) Reconcile(ctx context.Context, request reconcile.Re return reconcile.Result{}, r.onDelete(k8s.ExtractNamespacedName(&as)) } + results, state := r.doReconcile(ctx, &as) + + return results.WithError(r.updateStatus(ctx, state)).Aggregate() +} + +func (r *ReconcileApmServer) doReconcile(ctx context.Context, as *apmv1.ApmServer) (*reconciler.Results, State) { + state := NewState(as) + results := reconciler.NewResult(ctx) + areAssocsConfigured, err := association.AreConfiguredIfSet(as.GetAssociations(), r.recorder) if err != nil { - return reconcile.Result{}, tracing.CaptureError(ctx, err) + return results.WithError(tracing.CaptureError(ctx, err)), state } if !areAssocsConfigured { - return reconcile.Result{}, nil + return results, state } - return r.doReconcile(ctx, request, &as) -} - -func (r *ReconcileApmServer) doReconcile(ctx context.Context, request reconcile.Request, as *apmv1.ApmServer) (reconcile.Result, error) { // Run validation in case the webhook is disabled if err := r.validate(ctx, as); err != nil { - return reconcile.Result{}, err + return results.WithError(err), state } - state := NewState(request, as) svc, err := common.ReconcileService(ctx, r.Client, NewService(*as), as) if err != nil { - return reconcile.Result{}, err + return results.WithError(err), state } - _, results := certificates.Reconciler{ + _, results = certificates.Reconciler{ K8sClient: r.K8sClient(), DynamicWatches: r.DynamicWatches(), Owner: as, @@ -237,22 +244,22 @@ func (r *ReconcileApmServer) doReconcile(ctx context.Context, request reconcile. GarbageCollectSecrets: true, }.ReconcileCAAndHTTPCerts(ctx) if results.HasError() { - res, err := results.Aggregate() + _, err := results.Aggregate() k8s.EmitErrorEvent(r.recorder, err, as, events.EventReconciliationError, "Certificate reconciliation error: %v", err) - return res, err + return results, state } asVersion, err := version.Parse(as.Spec.Version) if err != nil { - return reconcile.Result{}, err + return results.WithError(err), state } logger := log.WithValues("namespace", as.Namespace, "as_name", as.Name) assocAllowed, err := association.AllowVersion(asVersion, as, logger, r.recorder) if err != nil { - return reconcile.Result{}, tracing.CaptureError(ctx, err) + return results.WithError(tracing.CaptureError(ctx, err)), state } if !assocAllowed { - return reconcile.Result{}, nil // will eventually retry + return results, state // will eventually retry } r.warnIfDeprecated(asVersion, as) @@ -261,23 +268,17 @@ func (r *ReconcileApmServer) doReconcile(ctx context.Context, request reconcile. if err != nil { if apierrors.IsConflict(err) { log.V(1).Info("Conflict while updating status") - return reconcile.Result{Requeue: true}, nil + return results.WithResult(reconcile.Result{Requeue: true}), state } k8s.EmitErrorEvent(r.recorder, err, as, events.EventReconciliationError, "Deployment reconciliation error: %v", err) - return state.Result, tracing.CaptureError(ctx, err) + return results.WithError(tracing.CaptureError(ctx, err)), state } state.UpdateApmServerExternalService(*svc) - // update status - err = r.updateStatus(ctx, state) - if err != nil && apierrors.IsConflict(err) { - log.V(1).Info("Conflict while updating status", "namespace", as.Namespace, "as", as.Name) - return reconcile.Result{Requeue: true}, nil - } - res, err := results.WithError(err).Aggregate() + _, err = results.WithError(err).Aggregate() k8s.EmitErrorEvent(r.recorder, err, as, events.EventReconciliationError, "Reconciliation error: %v", err) - return res, err + return results, state } func (r *ReconcileApmServer) warnIfDeprecated(version semver.Version, as *apmv1.ApmServer) { @@ -345,12 +346,12 @@ func (r *ReconcileApmServer) updateStatus(ctx context.Context, state State) erro span, _ := apm.StartSpan(ctx, "update_status", tracing.SpanTypeApp) defer span.End() - current := state.originalApmServer - if reflect.DeepEqual(current.Status, state.ApmServer.Status) { + original := state.originalApmServer + if reflect.DeepEqual(original.Status, state.ApmServer.Status) { return nil } - if state.ApmServer.Status.IsDegraded(current.Status.DeploymentStatus) { - r.recorder.Event(current, corev1.EventTypeWarning, events.EventReasonUnhealthy, "Apm Server health degraded") + if state.ApmServer.Status.IsDegraded(original.Status.DeploymentStatus) { + r.recorder.Event(original, corev1.EventTypeWarning, events.EventReasonUnhealthy, "Apm Server health degraded") } log.V(1).Info("Updating status", "iteration", atomic.LoadUint64(&r.iteration), @@ -361,6 +362,7 @@ func (r *ReconcileApmServer) updateStatus(ctx context.Context, state State) erro return common.UpdateStatus(r.Client, state.ApmServer) } +// NewService returns the service used by the APM Server. func NewService(as apmv1.ApmServer) *corev1.Service { svc := corev1.Service{ ObjectMeta: as.Spec.HTTP.Service.ObjectMeta, diff --git a/pkg/controller/apmserver/controller_test.go b/pkg/controller/apmserver/controller_test.go index 95518a7545..a175071121 100644 --- a/pkg/controller/apmserver/controller_test.go +++ b/pkg/controller/apmserver/controller_test.go @@ -6,17 +6,23 @@ package apmserver import ( "context" + "fmt" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/reconcile" apmv1 "github.com/elastic/cloud-on-k8s/pkg/apis/apm/v1" commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1" + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/pkg/controller/common" "github.com/elastic/cloud-on-k8s/pkg/controller/common/certificates" "github.com/elastic/cloud-on-k8s/pkg/controller/common/operator" @@ -32,14 +38,10 @@ func TestReconcileApmServer_doReconcile(t *testing.T) { dynamicWatches watches.DynamicWatches Parameters operator.Parameters } - type args struct { - request reconcile.Request - } tests := []struct { name string as apmv1.ApmServer fields fields - args args wantRequeue bool wantErr bool }{ @@ -65,9 +67,6 @@ func TestReconcileApmServer_doReconcile(t *testing.T) { }, }, }, - args: args{ - request: reconcile.Request{}, - }, wantRequeue: false, }, { @@ -87,9 +86,6 @@ func TestReconcileApmServer_doReconcile(t *testing.T) { dynamicWatches: watches.NewDynamicWatches(), Parameters: operator.Parameters{}, }, - args: args{ - request: reconcile.Request{}, - }, wantErr: true, }, } @@ -101,15 +97,16 @@ func TestReconcileApmServer_doReconcile(t *testing.T) { dynamicWatches: tt.fields.dynamicWatches, Parameters: tt.fields.Parameters, } - got, err := r.doReconcile(context.Background(), tt.args.request, tt.as.DeepCopy()) + results, _ := r.doReconcile(context.Background(), tt.as.DeepCopy()) + res, err := results.Aggregate() if (err != nil) != tt.wantErr { t.Errorf("ReconcileApmServer.doReconcile() error = %v, wantErr %v", err, tt.wantErr) return } - require.NotNil(t, got) - require.Equal(t, got.Requeue, tt.wantRequeue) + require.NotNil(t, results) + require.Equal(t, res.Requeue, tt.wantRequeue) if tt.wantRequeue { - require.True(t, got.RequeueAfter > 0) + require.True(t, res.RequeueAfter > 0) } }) } @@ -255,3 +252,265 @@ func mkAPMServer(httpConf commonv1.HTTPConfig) apmv1.ApmServer { }, } } + +func TestReconcileApmServer_Reconcile(t *testing.T) { + sampleAPMObject := apmv1.ApmServer{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "test", + Generation: 2, + }, + Spec: apmv1.ApmServerSpec{ + Version: "7.0.1", + Count: 1, + }, + Status: apmv1.ApmServerStatus{ + ObservedGeneration: 1, + }, + } + defaultRequest := reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test", + Namespace: "test", + }, + } + type fields struct { + Client k8s.Client + } + type args struct { + request reconcile.Request + } + tests := []struct { + name string + fields fields + args args + want reconcile.Result + wantErr bool + validate func(*testing.T, fields) + }{ + { + name: "unmanaged APM server does not increment observedGeneration", + fields: fields{ + Client: k8s.NewFakeClient( + withAnnotations(sampleAPMObject, map[string]string{common.ManagedAnnotation: "false"}), + ), + }, + args: args{ + request: defaultRequest, + }, + want: reconcile.Result{}, + wantErr: false, + //nolint:thelper + validate: func(t *testing.T, f fields) { + var apm apmv1.ApmServer + err := f.Client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "test"}, &apm) + require.NoError(t, err) + require.Equal(t, int64(1), apm.Status.ObservedGeneration) + }, + }, + { + name: "Legacy finalizer on APM server gets removed, and updates observedGeneration", + fields: fields{ + Client: k8s.NewFakeClient( + withFinalizers(sampleAPMObject, []string{"finalizer.elasticsearch.k8s.elastic.co/secure-settings-secret"}), + ), + }, + args: args{ + request: defaultRequest, + }, + want: reconcile.Result{}, + wantErr: false, + validate: func(t *testing.T, f fields) { + t.Helper() + var apm apmv1.ApmServer + err := f.Client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "test"}, &apm) + require.NoError(t, err) + require.Len(t, apm.ObjectMeta.Finalizers, 0) + require.Equal(t, int64(2), apm.Status.ObservedGeneration) + }, + }, + { + name: "With Elasticsearch association not ready, observedGeneration is updated", + fields: fields{ + Client: k8s.NewFakeClient( + withESReference(sampleAPMObject, commonv1.ObjectSelector{Name: "testes"}), + ), + }, + args: args{ + request: defaultRequest, + }, + want: reconcile.Result{}, + wantErr: false, + validate: func(t *testing.T, f fields) { + t.Helper() + var apm apmv1.ApmServer + err := f.Client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "test"}, &apm) + require.NoError(t, err) + require.Equal(t, int64(2), apm.Status.ObservedGeneration) + }, + }, + { + name: "With Elasticsearch association ready, but APM version not allowed with Elasticsearch version, observedGeneration is updated", + fields: fields{ + Client: k8s.NewFakeClient( + &esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testes", + Namespace: "test", + }, + Spec: esv1.ElasticsearchSpec{ + Version: "7.16.2", + }, + }, + withAssociationConf(*(withESReference(sampleAPMObject, commonv1.ObjectSelector{Name: "testes", Namespace: "test"})), commonv1.AssociationConf{ + AuthSecretName: "testes-es-elastic-user", + AuthSecretKey: "elastic", + CASecretName: "ca-secret", + CACertProvided: false, + URL: "https://es:9200", + // This will be considered an invalid version, as it's considered 'not reported yet'. + Version: "", + }), + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testes-es-elastic-user", + Namespace: "test", + }, + Data: map[string][]byte{ + "elastic": []byte("password"), + }, + }, + ), + }, + args: args{ + request: defaultRequest, + }, + want: reconcile.Result{}, + wantErr: false, + validate: func(t *testing.T, f fields) { + t.Helper() + var apm apmv1.ApmServer + err := f.Client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "test"}, &apm) + require.NoError(t, err) + require.Equal(t, int64(2), apm.Status.ObservedGeneration) + }, + }, + { + name: "With validation issues, observedGeneration is updated", + fields: fields{ + Client: k8s.NewFakeClient( + withName(sampleAPMObject, "superlongapmservernamecausesvalidationissues"), + ), + }, + args: args{ + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "superlongapmservernamecausesvalidationissues", + Namespace: "test", + }, + }, + }, + want: reconcile.Result{}, + wantErr: true, + validate: func(t *testing.T, f fields) { + t.Helper() + var apm apmv1.ApmServer + err := f.Client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "superlongapmservernamecausesvalidationissues"}, &apm) + require.NoError(t, err) + require.Equal(t, int64(2), apm.Status.ObservedGeneration) + }, + }, + { + name: "Reconcile of standard APM object updates observedGeneration, and creates deployment", + fields: fields{ + Client: k8s.NewFakeClient( + &sampleAPMObject, + ), + }, + args: args{ + request: defaultRequest, + }, + want: reconcile.Result{}, + wantErr: false, + validate: func(t *testing.T, f fields) { + t.Helper() + var apm apmv1.ApmServer + err := f.Client.Get(context.Background(), types.NamespacedName{Namespace: "test", Name: "test"}, &apm) + require.NoError(t, err) + require.Len(t, apm.ObjectMeta.Finalizers, 0) + require.Equal(t, int64(2), apm.Status.ObservedGeneration) + var deploymentList appsv1.DeploymentList + err = f.Client.List(context.Background(), &deploymentList) + require.NoError(t, err) + require.Len(t, deploymentList.Items, 1) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &ReconcileApmServer{ + Client: tt.fields.Client, + recorder: record.NewFakeRecorder(100), + dynamicWatches: watches.NewDynamicWatches(), + Parameters: operator.Parameters{}, + } + got, err := r.Reconcile(context.Background(), tt.args.request) + if (err != nil) != tt.wantErr { + t.Errorf("ReconcileApmServer.Reconcile() error = %v, wantErr %v", err, tt.wantErr) + return + } + // RequeueAfter is ignored here, as certificate reconciler sets this to expiration of the generated certificates. + if !cmp.Equal(got, tt.want, cmpopts.IgnoreFields(reconcile.Result{}, "RequeueAfter")) { + t.Errorf("ReconcileApmServer.Reconcile() = %v, want %v", got, tt.want) + } + tt.validate(t, tt.fields) + }) + } +} + +func withAnnotations(apm apmv1.ApmServer, annotations map[string]string) *apmv1.ApmServer { + obj := apm.DeepCopy() + obj.ObjectMeta.Annotations = annotations + return obj +} + +func withFinalizers(apm apmv1.ApmServer, finalizers []string) *apmv1.ApmServer { + obj := apm.DeepCopy() + obj.ObjectMeta.Finalizers = finalizers + return obj +} + +func withESReference(apm apmv1.ApmServer, selector commonv1.ObjectSelector) *apmv1.ApmServer { + obj := apm.DeepCopy() + obj.Spec.ElasticsearchRef = selector + return obj +} + +func withAssociationConf(apm apmv1.ApmServer, conf commonv1.AssociationConf) *apmv1.ApmServer { + obj := apm.DeepCopy() + association := apmv1.NewApmEsAssociation(obj) + association.SetAssociationConf( + &commonv1.AssociationConf{ + AuthSecretName: "auth-secret", + AuthSecretKey: "elastic", + CASecretName: "ca-secret", + CACertProvided: true, + URL: "https://es.svc:9200", + }, + ) + association.SetAnnotations(map[string]string{ + association.AssociationConfAnnotationName(): `{"authSecretName":"auth-secret", "authSecretKey":"elastic", "caSecretName": "ca-secret", "url":"https://es.svc:9200"}`, + }) + associated := association.Associated() + apmserver, ok := associated.(*apmv1.ApmServer) + if !ok { + panic(fmt.Sprintf("expected *apmv1.ApmServer, got: %T", associated)) + } + return apmserver +} + +func withName(apm apmv1.ApmServer, name string) *apmv1.ApmServer { + obj := apm.DeepCopy() + obj.ObjectMeta.Name = name + return obj +} diff --git a/pkg/controller/apmserver/state.go b/pkg/controller/apmserver/state.go index dd3fb97851..87a83b298e 100644 --- a/pkg/controller/apmserver/state.go +++ b/pkg/controller/apmserver/state.go @@ -7,7 +7,6 @@ package apmserver import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "sigs.k8s.io/controller-runtime/pkg/reconcile" apmv1 "github.com/elastic/cloud-on-k8s/pkg/apis/apm/v1" "github.com/elastic/cloud-on-k8s/pkg/controller/common" @@ -16,17 +15,16 @@ import ( // State holds the accumulated state during the reconcile loop including the response and a pointer to an ApmServer // resource for status updates. type State struct { - ApmServer *apmv1.ApmServer - Result reconcile.Result - Request reconcile.Request - + ApmServer *apmv1.ApmServer originalApmServer *apmv1.ApmServer } -// NewState creates a new reconcile state based on the given request and ApmServer resource with the resource -// state reset to empty. -func NewState(request reconcile.Request, as *apmv1.ApmServer) State { - return State{Request: request, ApmServer: as, originalApmServer: as.DeepCopy()} +// NewState creates a new reconcile state based on the given request and ApmServer resource, with the +// ApmServer's Status.ObservedGeneration set from the current generation of the ApmServer's specification. +func NewState(as *apmv1.ApmServer) State { + current := as.DeepCopy() + current.Status.ObservedGeneration = as.Generation + return State{ApmServer: current, originalApmServer: as} } // UpdateApmServerState updates the ApmServer status based on the given deployment. diff --git a/pkg/controller/common/driver/interface.go b/pkg/controller/common/driver/interface.go index 39bcc8befa..f51846137c 100644 --- a/pkg/controller/common/driver/interface.go +++ b/pkg/controller/common/driver/interface.go @@ -25,14 +25,18 @@ type TestDriver struct { FakeRecorder *record.FakeRecorder } +// K8sClient returns the kubernetes client from the APM Server reconciler. func (t TestDriver) K8sClient() k8s.Client { return t.Client } +// DynamicWatches returns the set of dynamic watches from the APM Server reconciler. func (t TestDriver) DynamicWatches() watches.DynamicWatches { return t.Watches } +// Recorder returns the Kubernetes recorder that is responsible for recording and reporting +// events from the APM Server reconciler. func (t TestDriver) Recorder() record.EventRecorder { return t.FakeRecorder } diff --git a/test/e2e/apm/upgrade_test.go b/test/e2e/apm/upgrade_test.go new file mode 100644 index 0000000000..c0f3f9b9a9 --- /dev/null +++ b/test/e2e/apm/upgrade_test.go @@ -0,0 +1,40 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +// +build apm e2e + +package apm + +import ( + "testing" + + "github.com/elastic/cloud-on-k8s/test/e2e/test" + "github.com/elastic/cloud-on-k8s/test/e2e/test/apmserver" + "github.com/elastic/cloud-on-k8s/test/e2e/test/elasticsearch" +) + +func TestAPMServerVersionUpgradeToLatest8x(t *testing.T) { + srcVersion := test.Ctx().ElasticStackVersion + dstVersion := test.LatestReleasedVersion8x + + test.SkipInvalidUpgrade(t, srcVersion, dstVersion) + + name := "apmserver-upgrade" + esBuilder := elasticsearch.NewBuilder(name). + WithVersion(srcVersion). + WithESMasterDataNodes(3, elasticsearch.DefaultResources) + + apmServerBuilder := apmserver.NewBuilder(name).WithVersion(srcVersion).WithElasticsearchRef(esBuilder.Ref()).WithoutIntegrationCheck() + + mutated := apmServerBuilder.WithVersion(dstVersion).WithElasticsearchRef(esBuilder.Ref()).WithMutatedFrom(&apmServerBuilder).WithoutIntegrationCheck() + + test.RunMutations( + t, + []test.Builder{esBuilder, apmServerBuilder}, + []test.Builder{ + esBuilder.WithVersion(dstVersion).WithMutatedFrom(&esBuilder), + mutated, + }, + ) +} diff --git a/test/e2e/es/mutation_test.go b/test/e2e/es/mutation_test.go index 0b0061a907..0244fe9223 100644 --- a/test/e2e/es/mutation_test.go +++ b/test/e2e/es/mutation_test.go @@ -13,14 +13,15 @@ import ( "testing" "time" - esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" - "github.com/elastic/cloud-on-k8s/pkg/dev/portforward" - "github.com/elastic/cloud-on-k8s/test/e2e/test" - "github.com/elastic/cloud-on-k8s/test/e2e/test/elasticsearch" "github.com/stretchr/testify/assert" vegeta "github.com/tsenart/vegeta/lib" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + + esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/pkg/dev/portforward" + "github.com/elastic/cloud-on-k8s/test/e2e/test" + "github.com/elastic/cloud-on-k8s/test/e2e/test/elasticsearch" ) // TestMutationHTTPToHTTPS creates a 3 node cluster running without TLS on the HTTP layer, diff --git a/test/e2e/samples_test.go b/test/e2e/samples_test.go index 874f3e481a..b9d33a598f 100644 --- a/test/e2e/samples_test.go +++ b/test/e2e/samples_test.go @@ -12,6 +12,9 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/rand" + commonv1 "github.com/elastic/cloud-on-k8s/pkg/apis/common/v1" "github.com/elastic/cloud-on-k8s/test/e2e/cmd/run" "github.com/elastic/cloud-on-k8s/test/e2e/test" @@ -20,8 +23,6 @@ import ( "github.com/elastic/cloud-on-k8s/test/e2e/test/enterprisesearch" "github.com/elastic/cloud-on-k8s/test/e2e/test/helper" "github.com/elastic/cloud-on-k8s/test/e2e/test/kibana" - "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/util/rand" ) func TestSamples(t *testing.T) { @@ -71,7 +72,8 @@ func createBuilders(t *testing.T, decoder *helper.YAMLDecoder, sampleFile, testN WithConfig(map[string]interface{}{"apm-server.ilm.enabled": false}). WithRestrictedSecurityContext(). WithLabel(run.TestNameLabel, fullTestName). - WithPodLabel(run.TestNameLabel, fullTestName) + WithPodLabel(run.TestNameLabel, fullTestName). + WithoutIntegrationCheck() case enterprisesearch.Builder: return b.WithNamespace(namespace). WithSuffix(suffix). diff --git a/test/e2e/smoke_test.go b/test/e2e/smoke_test.go index 52ba687473..f6f78b314b 100644 --- a/test/e2e/smoke_test.go +++ b/test/e2e/smoke_test.go @@ -11,13 +11,14 @@ import ( "os" "testing" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/yaml" + "github.com/elastic/cloud-on-k8s/test/e2e/cmd/run" "github.com/elastic/cloud-on-k8s/test/e2e/test" "github.com/elastic/cloud-on-k8s/test/e2e/test/apmserver" "github.com/elastic/cloud-on-k8s/test/e2e/test/elasticsearch" "github.com/elastic/cloud-on-k8s/test/e2e/test/kibana" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/apimachinery/pkg/util/yaml" ) const sampleFile = "../../config/samples/apm/apm_es_kibana.yaml" @@ -64,7 +65,8 @@ func TestSmoke(t *testing.T) { }). WithRestrictedSecurityContext(). WithLabel(run.TestNameLabel, testName). - WithPodLabel(run.TestNameLabel, testName) + WithPodLabel(run.TestNameLabel, testName). + WithoutIntegrationCheck() test.Sequence(nil, test.EmptySteps, esBuilder, kbBuilder, apmBuilder). RunSequential(t) diff --git a/test/e2e/test/apmserver/builder.go b/test/e2e/test/apmserver/builder.go index dd32e0c2dc..2f2c380239 100644 --- a/test/e2e/test/apmserver/builder.go +++ b/test/e2e/test/apmserver/builder.go @@ -22,6 +22,8 @@ import ( // Builder to create APM Servers type Builder struct { ApmServer apmv1.ApmServer + + MutatedFrom *Builder } var _ test.Builder = Builder{} @@ -112,6 +114,17 @@ func (b Builder) WithKibanaRef(ref commonv1.ObjectSelector) Builder { return b } +func (b Builder) DeepCopy() *Builder { + apm := b.ApmServer.DeepCopy() + builderCopy := Builder{ + ApmServer: *apm, + } + if b.MutatedFrom != nil { + builderCopy.MutatedFrom = b.MutatedFrom.DeepCopy() + } + return &builderCopy +} + func (b Builder) WithConfig(cfg map[string]interface{}) Builder { if b.ApmServer.Spec.Config == nil || b.ApmServer.Spec.Config.Data == nil { b.ApmServer.Spec.Config = &commonv1.Config{ @@ -120,10 +133,12 @@ func (b Builder) WithConfig(cfg map[string]interface{}) Builder { return b } + newBuilder := b.DeepCopy() + for k, v := range cfg { - b.ApmServer.Spec.Config.Data[k] = v + newBuilder.ApmServer.Spec.Config.Data[k] = v } - return b + return *newBuilder } func (b Builder) WithRUM(enabled bool) Builder { @@ -171,6 +186,11 @@ func (b Builder) WithoutIntegrationCheck() Builder { }) } +func (b Builder) WithMutatedFrom(builder *Builder) Builder { + b.MutatedFrom = builder + return b +} + func (b Builder) NSN() types.NamespacedName { return k8s.ExtractNamespacedName(&b.ApmServer) } diff --git a/test/e2e/test/apmserver/checks_apm.go b/test/e2e/test/apmserver/checks_apm.go index 295e15a118..17d069004f 100644 --- a/test/e2e/test/apmserver/checks_apm.go +++ b/test/e2e/test/apmserver/checks_apm.go @@ -12,6 +12,7 @@ import ( "net/http" "reflect" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,14 +24,17 @@ import ( "github.com/elastic/cloud-on-k8s/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/pkg/utils/retry" "github.com/elastic/cloud-on-k8s/test/e2e/test" "github.com/elastic/cloud-on-k8s/test/e2e/test/elasticsearch" "github.com/elastic/cloud-on-k8s/test/e2e/test/kibana" ) type apmClusterChecks struct { - apmClient *ApmClient - esClient client.Client + apmClient *ApmClient + esClient client.Client + metricIndexCount int + errorIndexCount int } func (b Builder) CheckStackTestSteps(k *test.K8sClient) test.StepList { @@ -39,8 +43,7 @@ func (b Builder) CheckStackTestSteps(k *test.K8sClient) test.StepList { a.BuildApmServerClient(b.ApmServer, k), a.CheckApmServerReachable(), a.CheckApmServerVersion(b.ApmServer), - a.CheckEventsAPI(), - a.CheckEventsInElasticsearch(b.ApmServer, k), + a.CheckAPMEventCanBeIndexedInElasticsearch(b.ApmServer, k), a.CheckRUMEventsAPI(b.RUMEnabled()), }.WithSteps(a.CheckAgentConfiguration(b.ApmServer, k)) } @@ -119,29 +122,71 @@ func (c *apmClusterChecks) CheckApmServerVersion(apm apmv1.ApmServer) test.Step } } -//nolint:thelper -func (c *apmClusterChecks) CheckEventsAPI() test.Step { +// CheckAPMEventCanBeIndexedInElasticsearch ensures that any event that is sent to APM Server +// eventually ends up within an Elasticsearch index. The index name varies between versions. +// APM Server version < 8.x creates an index, and writes data to a named index. APM Server +// version >= 8.x writes documents to a datastream, and an index is auto-created. +// This test step has to be eventual, as a transient issue happens when upgrading +// Elasticsearch between major versions where it takes a bit of time to transition between +// file-based user roles, and permissions errors are returned from Elasticsearch. +func (c *apmClusterChecks) CheckAPMEventCanBeIndexedInElasticsearch(apm apmv1.ApmServer, k *test.K8sClient) test.Step { + return test.Step{ + Name: "ApmServer should accept event and write data to Elasticsearch", + Test: test.Eventually(func() error { + // All APM Server tests do not have an Elasticsearch reference. + if !apm.Spec.ElasticsearchRef.IsDefined() { + return nil + } + if err := c.checkEventsAPI(apm); err != nil { + return err + } + return retry.UntilSuccess(func() error { + return c.checkEventsInElasticsearch(apm, k) + }, 30*time.Second, 2*time.Second) + }), + } +} + +func (c *apmClusterChecks) checkEventsAPI(apm apmv1.ApmServer) error { sampleBody := `{"metadata": { "service": {"name": "1234_service-12a3", "language": {"name": "ecmascript"}, "agent": {"version": "3.14.0", "name": "elastic-node"}}}} { "error": {"id": "abcdef0123456789", "timestamp": 1533827045999000,"log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} { "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": 1496170422281000 }}` + // before sending event, get the document count in the metric, and error index + // and save, as it is used to calculate how many docs should be in the index after + // the event is sent through APM Server. + metricIndex, errorIndex, err := getIndexNames(apm) + if err != nil { + return err + } - return test.Step{ - Name: "Events should be accepted", - Test: func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), DefaultReqTimeout) - defer cancel() - eventsErrorResponse, err := c.apmClient.IntakeV2Events(ctx, false, []byte(sampleBody)) - require.NoError(t, err) + var count int + count, err = countIndex(c.esClient, metricIndex) + // 404 is acceptable in this scenario, as the index may not exist yet. + if err != nil && !client.IsNotFound(err) { + return err + } + c.metricIndexCount = count - // in the happy case, we get no error response - assert.Nil(t, eventsErrorResponse) - if eventsErrorResponse != nil { - // provide more details: - assert.Equal(t, 2, eventsErrorResponse.Accepted) - assert.Len(t, eventsErrorResponse.Errors, 0) - } - }, + count, err = countIndex(c.esClient, errorIndex) + // 404 is acceptable in this scenario, as the index may not exist yet. + if err != nil && !client.IsNotFound(err) { + return err + } + c.errorIndexCount = count + + ctx, cancel := context.WithTimeout(context.Background(), DefaultReqTimeout) + defer cancel() + eventsErrorResponse, err := c.apmClient.IntakeV2Events(ctx, false, []byte(sampleBody)) + if err != nil { + return err } + + // in the happy case, we get no error response + if eventsErrorResponse != nil { + return fmt.Errorf("expected no error response when sending event to apm server got: %v", *eventsErrorResponse) + } + + return nil } func (c *apmClusterChecks) CheckRUMEventsAPI(rumEnabled bool) test.Step { @@ -181,47 +226,53 @@ type CountResult struct { // CheckEventsInElasticsearch checks that the events sent in the previous step have been stored. // We only count document to not rely on the internal schema of the APM Server. -func (c *apmClusterChecks) CheckEventsInElasticsearch(apm apmv1.ApmServer, k *test.K8sClient) test.Step { - return test.Step{ - Name: "Events should eventually show up in Elasticsearch", - Test: test.Eventually(func() error { - // Fetch the last version of the APM Server - var updatedApmServer apmv1.ApmServer - if err := k.Client.Get(context.Background(), k8s.ExtractNamespacedName(&apm), &updatedApmServer); err != nil { - return err - } +func (c *apmClusterChecks) checkEventsInElasticsearch(apm apmv1.ApmServer, k *test.K8sClient) error { + var updatedApmServer apmv1.ApmServer + if err := k.Client.Get(context.Background(), k8s.ExtractNamespacedName(&apm), &updatedApmServer); err != nil { + return err + } - if !updatedApmServer.Spec.ElasticsearchRef.IsDefined() { - // No ES is referenced, do not try to check data - return nil - } + if !updatedApmServer.Spec.ElasticsearchRef.IsDefined() { + // No ES is referenced, do not try to check data + return nil + } - v, err := version.Parse(updatedApmServer.Spec.Version) - if err != nil { - return err - } + metricIndex, errorIndex, err := getIndexNames(updatedApmServer) + if err != nil { + return err + } - // Check that the metric and error have been stored - // default to indices names from 6.x - metricIndex := fmt.Sprintf("apm-%s-2017.05.30", updatedApmServer.EffectiveVersion()) - errorIndex := fmt.Sprintf("apm-%s-2018.08.09", updatedApmServer.EffectiveVersion()) - switch v.Major { - case 7: - metricIndex = fmt.Sprintf("apm-%s-metric-2017.05.30", updatedApmServer.EffectiveVersion()) - errorIndex = fmt.Sprintf("apm-%s-error-2018.08.09", updatedApmServer.EffectiveVersion()) - case 8: - // these are datastreams and not indices, but can be searched/counted in the same way - metricIndex = "metrics-apm.app.1234_service_12a3-default" - errorIndex = "logs-apm.error-default" - } + if err := assertCountIndexEqual(c.esClient, metricIndex, c.metricIndexCount+1); err != nil { + return err + } - if err := assertCountIndexEqual(c.esClient, metricIndex, 1); err != nil { - return err - } + return assertCountIndexEqual(c.esClient, errorIndex, c.errorIndexCount+1) +} - return assertCountIndexEqual(c.esClient, errorIndex, 1) - }), +// getIndexNames will return the names of the metric, and error indexes, depending on +// the version of the APM Server, and any error encountered while parsing the version. +func getIndexNames(apm apmv1.ApmServer) (string, string, error) { + var metricIndex, errorIndex string + v, err := version.Parse(apm.Spec.Version) + if err != nil { + return metricIndex, errorIndex, err } + + // Check that the metric and error have been stored + // default to indices names from 6.x + metricIndex = fmt.Sprintf("apm-%s-2017.05.30", apm.EffectiveVersion()) + errorIndex = fmt.Sprintf("apm-%s-2018.08.09", apm.EffectiveVersion()) + switch v.Major { + case 7: + metricIndex = fmt.Sprintf("apm-%s-metric-2017.05.30", apm.EffectiveVersion()) + errorIndex = fmt.Sprintf("apm-%s-error-2018.08.09", apm.EffectiveVersion()) + case 8: + // these are datastreams and not indices, but can be searched/counted in the same way + metricIndex = "metrics-apm.app.1234_service_12a3-default" + errorIndex = "logs-apm.error-default" + } + + return metricIndex, errorIndex, nil } // assertCountIndexEqual asserts that the number of document in an index is the expected one, it raises an error otherwise. @@ -231,7 +282,7 @@ func assertCountIndexEqual(esClient client.Client, index string, expected int) e return err } if metricCount != expected { - return fmt.Errorf("%d document expected in index %s, got %d instead", expected, index, metricCount) + return fmt.Errorf("%d documents expected in index %s, got %d instead", expected, index, metricCount) } return nil } diff --git a/test/e2e/test/apmserver/checks_k8s.go b/test/e2e/test/apmserver/checks_k8s.go index 9c27e4378d..ec8e421bbb 100644 --- a/test/e2e/test/apmserver/checks_k8s.go +++ b/test/e2e/test/apmserver/checks_k8s.go @@ -117,6 +117,7 @@ func CheckStatus(b Builder, k *test.K8sClient) test.Step { // don't check association statuses that may vary across tests as.Status.ElasticsearchAssociationStatus = "" as.Status.KibanaAssociationStatus = "" + as.Status.ObservedGeneration = 0 // Selector is a string built from a map, it is validated with a dedicated function. // The expected value is hardcoded on purpose to ensure there is no regression in the way the set of labels diff --git a/test/e2e/test/apmserver/steps_mutation.go b/test/e2e/test/apmserver/steps_mutation.go index 8883cc4b2c..10f28635f5 100644 --- a/test/e2e/test/apmserver/steps_mutation.go +++ b/test/e2e/test/apmserver/steps_mutation.go @@ -10,10 +10,19 @@ import ( apmv1 "github.com/elastic/cloud-on-k8s/pkg/apis/apm/v1" "github.com/elastic/cloud-on-k8s/pkg/utils/k8s" "github.com/elastic/cloud-on-k8s/test/e2e/test" + "github.com/elastic/cloud-on-k8s/test/e2e/test/generation" ) func (b Builder) MutationTestSteps(k *test.K8sClient) test.StepList { - panic("not implemented") + var apmServerGenerationBeforeMutation, apmServerObservedGenerationBeforeMutation int64 + isMutated := b.MutatedFrom != nil + + return test.StepList{ + generation.RetrieveGenerationsStep(&b.ApmServer, k, &apmServerGenerationBeforeMutation, &apmServerObservedGenerationBeforeMutation), + }.WithSteps(b.UpgradeTestSteps(k)). + WithSteps(b.CheckK8sTestSteps(k)). + WithSteps(b.CheckStackTestSteps(k)). + WithStep(generation.CompareObjectGenerationsStep(&b.ApmServer, k, isMutated, apmServerGenerationBeforeMutation, apmServerObservedGenerationBeforeMutation)) } func (b Builder) UpgradeTestSteps(k *test.K8sClient) test.StepList {