Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new operator flag to control Elasticsearch health observation intervals #5861

Merged
merged 5 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func Command() *cobra.Command {
3*time.Minute,
"Default timeout for requests made by the Elasticsearch client.",
)
cmd.Flags().Duration(
operator.ElasticsearchObservationIntervalFlag,
10*time.Second,
thbkrkr marked this conversation as resolved.
Show resolved Hide resolved
"Interval between observations of Elasticsearch health, non-positive values disable asynchronous observation",
)
cmd.Flags().Bool(
operator.DisableTelemetryFlag,
false,
Expand Down Expand Up @@ -580,12 +585,13 @@ func startOperator(ctx context.Context) error {
}

params := operator.Parameters{
Dialer: dialer,
ExposedNodeLabels: exposedNodeLabels,
IPFamily: ipFamily,
OperatorNamespace: operatorNamespace,
OperatorInfo: operatorInfo,
GlobalCA: ca,
Dialer: dialer,
ElasticsearchObservationInterval: viper.GetDuration(operator.ElasticsearchObservationIntervalFlag),
ExposedNodeLabels: exposedNodeLabels,
IPFamily: ipFamily,
OperatorNamespace: operatorNamespace,
OperatorInfo: operatorInfo,
GlobalCA: ca,
CACertRotation: certificates.RotationParams{
Validity: caCertValidity,
RotateBefore: caCertRotateBefore,
Expand Down
65 changes: 33 additions & 32 deletions pkg/controller/common/operator/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,37 @@
package operator

const (
AutoPortForwardFlag = "auto-port-forward"
CADirFlag = "ca-dir"
CACertRotateBeforeFlag = "ca-cert-rotate-before"
CACertValidityFlag = "ca-cert-validity"
CertRotateBeforeFlag = "cert-rotate-before"
CertValidityFlag = "cert-validity"
ConfigFlag = "config"
ContainerRegistryFlag = "container-registry"
DebugHTTPListenFlag = "debug-http-listen"
DisableConfigWatch = "disable-config-watch"
DisableTelemetryFlag = "disable-telemetry"
DistributionChannelFlag = "distribution-channel"
ElasticsearchClientTimeout = "elasticsearch-client-timeout"
EnableLeaderElection = "enable-leader-election"
EnableTracingFlag = "enable-tracing"
EnableWebhookFlag = "enable-webhook"
EnforceRBACOnRefsFlag = "enforce-rbac-on-refs"
ExposedNodeLabels = "exposed-node-labels"
IPFamilyFlag = "ip-family"
KubeClientTimeout = "kube-client-timeout"
ManageWebhookCertsFlag = "manage-webhook-certs"
MaxConcurrentReconcilesFlag = "max-concurrent-reconciles"
MetricsPortFlag = "metrics-port"
NamespacesFlag = "namespaces"
OperatorNamespaceFlag = "operator-namespace"
SetDefaultSecurityContextFlag = "set-default-security-context"
TelemetryIntervalFlag = "telemetry-interval"
UBIOnlyFlag = "ubi-only"
ValidateStorageClassFlag = "validate-storage-class"
WebhookCertDirFlag = "webhook-cert-dir"
WebhookNameFlag = "webhook-name"
WebhookSecretFlag = "webhook-secret"
AutoPortForwardFlag = "auto-port-forward"
CADirFlag = "ca-dir"
CACertRotateBeforeFlag = "ca-cert-rotate-before"
CACertValidityFlag = "ca-cert-validity"
CertRotateBeforeFlag = "cert-rotate-before"
CertValidityFlag = "cert-validity"
ConfigFlag = "config"
ContainerRegistryFlag = "container-registry"
DebugHTTPListenFlag = "debug-http-listen"
DisableConfigWatch = "disable-config-watch"
DisableTelemetryFlag = "disable-telemetry"
DistributionChannelFlag = "distribution-channel"
ElasticsearchClientTimeout = "elasticsearch-client-timeout"
ElasticsearchObservationIntervalFlag = "elasticsearch-observation-interval"
EnableLeaderElection = "enable-leader-election"
EnableTracingFlag = "enable-tracing"
EnableWebhookFlag = "enable-webhook"
EnforceRBACOnRefsFlag = "enforce-rbac-on-refs"
ExposedNodeLabels = "exposed-node-labels"
IPFamilyFlag = "ip-family"
KubeClientTimeout = "kube-client-timeout"
ManageWebhookCertsFlag = "manage-webhook-certs"
MaxConcurrentReconcilesFlag = "max-concurrent-reconciles"
MetricsPortFlag = "metrics-port"
NamespacesFlag = "namespaces"
OperatorNamespaceFlag = "operator-namespace"
SetDefaultSecurityContextFlag = "set-default-security-context"
TelemetryIntervalFlag = "telemetry-interval"
UBIOnlyFlag = "ubi-only"
ValidateStorageClassFlag = "validate-storage-class"
WebhookCertDirFlag = "webhook-cert-dir"
WebhookNameFlag = "webhook-name"
WebhookSecretFlag = "webhook-secret"
)
4 changes: 4 additions & 0 deletions pkg/controller/common/operator/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package operator

import (
"time"

"go.elastic.co/apm/v2"
corev1 "k8s.io/api/core/v1"

Expand All @@ -16,6 +18,8 @@ import (

// Parameters contain parameters to create new operators.
type Parameters struct {
// ElasticsearchObservationInterval is the interval between (asynchronous) observations of Elasticsearch health.
ElasticsearchObservationInterval time.Duration
// ExposedNodeLabels holds regular expressions of node labels which are allowed to be automatically set as annotations on Elasticsearch Pods.
ExposedNodeLabels esvalidation.NodeLabels
// OperatorNamespace is the control plane namespace of the operator.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/elasticsearch/elasticsearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileEl
Client: client,
recorder: mgr.GetEventRecorderFor(name),
licenseChecker: license.NewLicenseChecker(client, params.OperatorNamespace),
esObservers: observer.NewManager(params.Tracer),
esObservers: observer.NewManager(params.ElasticsearchObservationInterval, params.Tracer),

dynamicWatches: watches.NewDynamicWatches(),
expectations: expectations.NewClustersExpectations(client),
Expand Down
36 changes: 27 additions & 9 deletions pkg/controller/elasticsearch/observer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package observer

import (
"sync"
"time"

"go.elastic.co/apm/v2"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -23,18 +24,20 @@ const (

// Manager for a set of observers
type Manager struct {
observerLock sync.RWMutex
observers map[types.NamespacedName]*Observer
listenerLock sync.RWMutex
listeners []OnObservation // invoked on each observation event
tracer *apm.Tracer
defaultInterval time.Duration
thbkrkr marked this conversation as resolved.
Show resolved Hide resolved
observerLock sync.RWMutex
observers map[types.NamespacedName]*Observer
listenerLock sync.RWMutex
listeners []OnObservation // invoked on each observation event
tracer *apm.Tracer
}

// NewManager returns a new manager
func NewManager(tracer *apm.Tracer) *Manager {
func NewManager(defaultInterval time.Duration, tracer *apm.Tracer) *Manager {
return &Manager{
observers: make(map[types.NamespacedName]*Observer),
tracer: tracer,
defaultInterval: defaultInterval,
observers: make(map[types.NamespacedName]*Observer),
tracer: tracer,
}
}

Expand Down Expand Up @@ -68,6 +71,9 @@ func (m *Manager) Observe(cluster esv1.Elasticsearch, esClient client.Client) *O
return m.createOrReplaceObserver(nsName, settings, esClient)
case exists && (!observer.esClient.Equal(esClient) || observer.settings != settings):
return m.createOrReplaceObserver(nsName, settings, esClient)
case exists && settings.ObservationInterval <= 0:
// in case asynchronous observation has been disabled ensure at least one observation at reconciliation time.
return m.getAndObserveSynchronously(nsName)
default:
esClient.Close()
return observer
Expand All @@ -77,7 +83,7 @@ func (m *Manager) Observe(cluster esv1.Elasticsearch, esClient client.Client) *O
// extractObserverSettings extracts observer settings from the annotations on the Elasticsearch resource.
func (m *Manager) extractObserverSettings(cluster esv1.Elasticsearch) Settings {
return Settings{
ObservationInterval: annotation.ExtractTimeout(cluster.ObjectMeta, ObserverIntervalAnnotation, defaultObservationInterval),
ObservationInterval: annotation.ExtractTimeout(cluster.ObjectMeta, ObserverIntervalAnnotation, m.defaultInterval),
Tracer: m.tracer,
}
}
Expand All @@ -102,6 +108,18 @@ func (m *Manager) createOrReplaceObserver(cluster types.NamespacedName, settings
return observer
}

// getAndObserveSynchronously retrieves the currently configured observer and trigger a synchronous observation.
func (m *Manager) getAndObserveSynchronously(cluster types.NamespacedName) *Observer {
m.observerLock.RLock()
defer m.observerLock.RUnlock()

// invariant: this method must only be called when existence of observer is given
observer := m.observers[cluster]
// force a synchronous observation
observer.observe()
return observer
}

// List returns the names of clusters currently observed
func (m *Manager) List() []types.NamespacedName {
m.observerLock.RLock()
Expand Down
99 changes: 85 additions & 14 deletions pkg/controller/elasticsearch/observer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package observer

import (
"bytes"
"io/ioutil"
"net/http"
"testing"
"time"

Expand All @@ -14,7 +17,9 @@ import (
"k8s.io/apimachinery/pkg/types"

esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/client"
fixtures "github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/client/test_fixtures"
)

func TestManager_List(t *testing.T) {
Expand All @@ -39,7 +44,7 @@ func TestManager_List(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewManager(nil)
m := NewManager(0, nil)
m.observers = tt.observers
require.ElementsMatch(t, tt.want, m.List())
})
Expand All @@ -54,7 +59,7 @@ func TestManager_Observe(t *testing.T) {
fakeClient := fakeEsClient200(client.BasicAuth{})
fakeClientWithDifferentUser := fakeEsClient200(client.BasicAuth{Name: "name", Password: "another-one"})
defaultSettings := Settings{
ObservationInterval: defaultObservationInterval,
ObservationInterval: defaultObservationTimeout,
}

tests := []struct {
Expand Down Expand Up @@ -100,7 +105,7 @@ func TestManager_Observe(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewManager(nil)
m := NewManager(10*time.Second, nil)
m.observers = tt.initiallyObserved
var initialCreationTime time.Time
if initial, exists := tt.initiallyObserved[tt.clusterToObserve]; exists {
Expand All @@ -121,6 +126,69 @@ func TestManager_Observe(t *testing.T) {
}
}

func flappingEsClient() client.Client {
var retErr bool
return client.NewMockClientWithUser(version.MustParse("8.3.0"),
client.BasicAuth{},
func(req *http.Request) *http.Response {
if retErr {
retErr = false
return &http.Response{
StatusCode: 503,
Header: make(http.Header),
Request: req,
}
}
retErr = true
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString(fixtures.HealthSample)),
Header: make(http.Header),
Request: req,
}
})
}

func TestManager_ObserveSync(t *testing.T) {
tests := []struct {
name string
manager *Manager
expectedHealth []esv1.ElasticsearchHealth
}{
{
name: "Async observation disabled make sync requests every time",
manager: NewManager(-1*time.Second, nil),
expectedHealth: []esv1.ElasticsearchHealth{
esv1.ElasticsearchGreenHealth,
// the flapping client returns an error on the second request
esv1.ElasticsearchUnknownHealth,
},
},
{
name: "Async observation enabled, only the first request is synchronous",
manager: NewManager(1*time.Hour, nil),
expectedHealth: []esv1.ElasticsearchHealth{
esv1.ElasticsearchGreenHealth,
// the async observer returns the old observation while the observation interval has not expired
esv1.ElasticsearchGreenHealth,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
esClient := flappingEsClient()
name := cluster("es1")
cluster := esObject(name)
results := []esv1.ElasticsearchHealth{
tt.manager.ObservedStateResolver(cluster, esClient)(),
tt.manager.ObservedStateResolver(cluster, esClient)(),
}
require.Equal(t, tt.expectedHealth, results)
tt.manager.StopObserving(name) // let's clean up the go-routines
})
}
}

func TestManager_StopObserving(t *testing.T) {
esClient := fakeEsClient200(client.BasicAuth{})
tests := []struct {
Expand Down Expand Up @@ -163,7 +231,7 @@ func TestManager_StopObserving(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := NewManager(nil)
m := NewManager(10*time.Second, nil)
m.observers = tt.observed
for _, name := range tt.stopObserving {
m.StopObserving(name)
Expand All @@ -174,7 +242,7 @@ func TestManager_StopObserving(t *testing.T) {
}

func TestManager_AddObservationListener(t *testing.T) {
m := NewManager(nil)
m := NewManager(1*time.Second, nil)

cluster1 := esObject(cluster("cluster1"))
cluster1.ObjectMeta.Annotations = map[string]string{ObserverIntervalAnnotation: "0.000001s"}
Expand Down Expand Up @@ -225,25 +293,28 @@ func esObject(n types.NamespacedName) esv1.Elasticsearch {

func TestExtractSettings(t *testing.T) {
testCases := []struct {
name string
annotations map[string]string
want Settings
name string
globalInterval time.Duration
annotations map[string]string
want Settings
}{
{
name: "no annotations",
want: Settings{ObservationInterval: defaultObservationInterval},
name: "no annotations",
globalInterval: 1 * time.Minute,
want: Settings{ObservationInterval: 1 * time.Minute},
},
{
name: "with annotations",
annotations: map[string]string{ObserverIntervalAnnotation: "42s"},
want: Settings{ObservationInterval: 42 * time.Second},
name: "with annotations",
globalInterval: 1 * time.Second,
annotations: map[string]string{ObserverIntervalAnnotation: "42s"},
want: Settings{ObservationInterval: 42 * time.Second},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
es := esv1.Elasticsearch{ObjectMeta: metav1.ObjectMeta{Name: "test", Annotations: tc.annotations}}
m := NewManager(nil)
m := NewManager(tc.globalInterval, nil)
have := m.extractObserverSettings(es)
require.Equal(t, tc.want, have)
})
Expand Down
Loading