Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch observer improvements to avoid blocking between workers. #6084

Merged
merged 5 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
if min == nil {
min = &d.Version
}

isServiceReady, err := services.IsServiceReady(d.Client, *internalService)
if err != nil {
return results.WithError(err)
}

observedState := d.Observers.ObservedStateResolver(
ctx,
d.ES,
Expand All @@ -191,6 +197,7 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
*min,
trustedHTTPCertificates,
),
isServiceReady,
)

// Always update the Elasticsearch state bits with the latest observed state.
Expand Down Expand Up @@ -233,11 +240,6 @@ func (d *defaultDriver) Reconcile(ctx context.Context) *reconciler.Results {
)
defer esClient.Close()

isServiceReady, err := services.IsServiceReady(d.Client, *internalService)
if err != nil {
return results.WithError(err)
}

// use unknown health as a proxy for a cluster not responding to requests
hasKnownHealthState := observedState() != esv1.ElasticsearchUnknownHealth
esReachable := isServiceReady && hasKnownHealthState
Expand Down
43 changes: 30 additions & 13 deletions pkg/controller/elasticsearch/observer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

esv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/annotation"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
)
Expand Down Expand Up @@ -44,8 +45,13 @@ func NewManager(defaultInterval time.Duration, tracer *apm.Tracer) *Manager {

// ObservedStateResolver returns a function that returns the last known state of the given cluster,
// as expected by the main reconciliation driver
func (m *Manager) ObservedStateResolver(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client) func() esv1.ElasticsearchHealth {
observer := m.Observe(ctx, cluster, esClient)
func (m *Manager) ObservedStateResolver(
ctx context.Context,
cluster esv1.Elasticsearch,
esClient client.Client,
isServiceReady bool,
) func() esv1.ElasticsearchHealth {
observer := m.Observe(ctx, cluster, esClient, isServiceReady)
return func() esv1.ElasticsearchHealth {
return observer.LastHealth()
}
Expand All @@ -61,24 +67,36 @@ func (m *Manager) getObserver(key types.NamespacedName) (*Observer, bool) {

// Observe gets or create a cluster state observer for the given cluster
// In case something has changed in the given esClient (eg. different caCert), the observer is recreated accordingly
func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client) *Observer {
func (m *Manager) Observe(ctx context.Context, cluster esv1.Elasticsearch, esClient client.Client, isServiceReady bool) *Observer {
defer tracing.Span(&ctx)()
nsName := k8s.ExtractNamespacedName(&cluster)
settings := m.extractObserverSettings(ctx, cluster)

observer, exists := m.getObserver(nsName)

switch {
case !exists:
return m.createOrReplaceObserver(nsName, settings, esClient)
// This Elasticsearch resource has not being observed yet, create the observer and maybe do a first observation.
observer = m.createOrReplaceObserver(ctx, nsName, settings, esClient)
case exists && (!observer.esClient.Equal(esClient) || observer.settings != settings):
return m.createOrReplaceObserver(nsName, settings, esClient)
// This Elasticsearch resource is already being observed asynchronously, no need to do a first observation.
observer = m.createOrReplaceObserver(ctx, nsName, settings, esClient)
case exists && settings.ObservationInterval <= 0:
// in case asynchronous observation has been disabled ensure at least one observation at reconciliation time.
return m.getAndObserveSynchronously(nsName)
return m.getAndObserveSynchronously(ctx, nsName)
default:
// No change, close the provided Client and return the existing observer.
esClient.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had forgotten that. It's not great. I opened:

return observer
}

if !exists && isServiceReady {
// there was no existing observer and Service is ready: let's try an initial synchronous observation
observer.observe(ctx)
}
// start the new observer
observer.Start()
return observer
}

// extractObserverSettings extracts observer settings from the annotations on the Elasticsearch resource.
Expand All @@ -90,7 +108,9 @@ func (m *Manager) extractObserverSettings(ctx context.Context, cluster esv1.Elas
}

// createOrReplaceObserver creates a new observer and adds it to the observers map, replacing existing observers if necessary.
func (m *Manager) createOrReplaceObserver(cluster types.NamespacedName, settings Settings, esClient client.Client) *Observer {
// The new observer is not started, it is up to the caller to invoke observer.Start(ctx)
func (m *Manager) createOrReplaceObserver(ctx context.Context, cluster types.NamespacedName, settings Settings, esClient client.Client) *Observer {
defer tracing.Span(&ctx)()
m.observerLock.Lock()
defer m.observerLock.Unlock()

Expand All @@ -100,24 +120,21 @@ func (m *Manager) createOrReplaceObserver(cluster types.NamespacedName, settings
observer.Stop()
delete(m.observers, cluster)
}

observer = NewObserver(cluster, esClient, settings, m.notifyListeners)
observer.Start()

m.observers[cluster] = observer

return observer
}

// getAndObserveSynchronously retrieves the currently configured observer and trigger a synchronous observation.
func (m *Manager) getAndObserveSynchronously(cluster types.NamespacedName) *Observer {
func (m *Manager) getAndObserveSynchronously(ctx context.Context, cluster types.NamespacedName) *Observer {
defer tracing.Span(&ctx)()
m.observerLock.RLock()
defer m.observerLock.RUnlock()

// invariant: this method must only be called when existence of observer is given
observer := m.observers[cluster]
// force a synchronous observation
observer.observe()
observer.observe(ctx)
return observer
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/elasticsearch/observer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestManager_Observe(t *testing.T) {
if initial, exists := tt.initiallyObserved[tt.clusterToObserve]; exists {
initialCreationTime = initial.creationTime
}
observer := m.Observe(context.Background(), esObject(tt.clusterToObserve), tt.clusterToObserveClient)
observer := m.Observe(context.Background(), esObject(tt.clusterToObserve), tt.clusterToObserveClient, true)
// returned observer should be the correct one
require.Equal(t, tt.clusterToObserve, observer.cluster)
// list of observers should have been updated
Expand Down Expand Up @@ -181,8 +181,8 @@ func TestManager_ObserveSync(t *testing.T) {
name := cluster("es1")
cluster := esObject(name)
results := []esv1.ElasticsearchHealth{
tt.manager.ObservedStateResolver(context.Background(), cluster, esClient)(),
tt.manager.ObservedStateResolver(context.Background(), cluster, esClient)(),
tt.manager.ObservedStateResolver(context.Background(), cluster, esClient, true)(),
tt.manager.ObservedStateResolver(context.Background(), cluster, esClient, true)(),
}
require.Equal(t, tt.expectedHealth, results)
tt.manager.StopObserving(name) // let's clean up the go-routines
Expand Down Expand Up @@ -277,9 +277,9 @@ func TestManager_AddObservationListener(t *testing.T) {
close(doneCh)
}()
// observe 2 clusters
obs1 := m.Observe(ctx, cluster1, fakeEsClient200(client.BasicAuth{}))
obs1 := m.Observe(ctx, cluster1, fakeEsClient200(client.BasicAuth{}), true)
defer obs1.Stop()
obs2 := m.Observe(ctx, cluster2, fakeEsClient200(client.BasicAuth{}))
obs2 := m.Observe(ctx, cluster2, fakeEsClient200(client.BasicAuth{}), true)
defer obs2.Stop()
<-doneCh
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/controller/elasticsearch/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func NewObserver(cluster types.NamespacedName, esClient client.Client, settings
// The first observation is synchronous to allow to retrieve the cluster state immediately after the start.
// Then, observations are performed periodically in a separate goroutine until the observer stop channel is closed.
func (o *Observer) Start() {
// initial synchronous observation
o.observe()
if o.settings.ObservationInterval <= 0 {
return // asynchronous observations are effectively disabled
}
Expand All @@ -85,7 +83,7 @@ func (o *Observer) Start() {
for {
select {
case <-ticker.C:
o.observe()
o.observe(context.Background())
case <-o.stopChan:
log.Info("Stopping observer for cluster", "namespace", o.cluster.Namespace, "es_name", o.cluster.Name)
return
Expand All @@ -111,11 +109,12 @@ func (o *Observer) LastHealth() esv1.ElasticsearchHealth {

// observe retrieves the current ES state, executes onObservation,
// and stores the new state
func (o *Observer) observe() {
ctx, cancelFunc := context.WithTimeout(context.Background(), nonNegativeTimeout(o.settings.ObservationInterval))
func (o *Observer) observe(ctx context.Context) {
defer tracing.Span(&ctx)()
ctx, cancelFunc := context.WithTimeout(ctx, nonNegativeTimeout(o.settings.ObservationInterval))
defer cancelFunc()

if o.settings.Tracer != nil {
if o.settings.Tracer != nil && apm.TransactionFromContext(ctx) == nil {
tx := o.settings.Tracer.StartTransaction(name, string(tracing.PeriodicTxType))
defer tx.End()
ctx = apm.ContextWithTransaction(ctx, tx)
Expand All @@ -128,10 +127,13 @@ func (o *Observer) observe() {
if o.onObservation != nil {
o.onObservation(o.cluster, o.LastHealth(), newHealth)
}
o.updateHealth(newHealth)
}

func (o *Observer) updateHealth(newHealth esv1.ElasticsearchHealth) {
o.mutex.Lock()
defer o.mutex.Unlock()
o.lastHealth = newHealth
o.mutex.Unlock()
}

func nonNegativeTimeout(observationInterval time.Duration) time.Duration {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/elasticsearch/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ func TestObserver_observe(t *testing.T) {
esClient: fakeEsClient,
onObservation: onObservation,
}
observer.observe()
observer.observe(context.Background())
require.Equal(t, int32(1), atomic.LoadInt32(&counter))
observer.observe()
observer.observe(context.Background())
require.Equal(t, int32(2), atomic.LoadInt32(&counter))
}

Expand All @@ -69,7 +69,7 @@ func TestObserver_observe_nilFunction(t *testing.T) {
onObservation: nilFunc,
}
// should not panic
observer.observe()
observer.observe(context.Background())
}

func TestNewObserver(t *testing.T) {
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestObserver_Stop(t *testing.T) {
}
observer := createAndRunTestObserver(onObservation)
// force at least one observation
observer.observe()
observer.observe(context.Background())
// stop the observer
observer.Stop()
// should be safe to call multiple times
Expand Down