diff --git a/pkg/deployment/cluster_informer.go b/pkg/deployment/cluster_informer.go deleted file mode 100644 index e0f4c95a5..000000000 --- a/pkg/deployment/cluster_informer.go +++ /dev/null @@ -1,104 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2018 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// -// Author Ewout Prangsma -// - -package deployment - -import ( - "context" - "time" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// listenForClusterEvents keep listening for changes entered in the UI of the cluster. -func (d *Deployment) listenForClusterEvents(stopCh <-chan struct{}) { - for { - delay := time.Second * 2 - - // Inspect once - ctx := context.Background() - if err := d.inspectCluster(ctx); err != nil { - d.deps.Log.Debug().Err(err).Msg("Cluster inspection failed") - } - - select { - case <-time.After(delay): - // Continue - case <-stopCh: - // We're done - return - } - } -} - -// Perform a single inspection of the cluster -func (d *Deployment) inspectCluster(ctx context.Context) error { - log := d.deps.Log - c, err := d.clientCache.GetDatabase(ctx) - if err != nil { - return maskAny(err) - } - req, err := arangod.GetNumberOfServers(ctx, c.Connection()) - if err != nil { - log.Debug().Err(err).Msg("Failed to get number of servers") - return maskAny(err) - } - if req.Coordinators == nil && req.DBServers == nil { - // Nothing to check - return nil - } - coordinatorsChanged := false - dbserversChanged := false - d.lastNumberOfServers.mutex.Lock() - defer d.lastNumberOfServers.mutex.Unlock() - desired := d.lastNumberOfServers.NumberOfServers - if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { - // #Coordinator has changed - coordinatorsChanged = true - } - if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { - // #DBServers has changed - dbserversChanged = true - } - if !coordinatorsChanged && !dbserversChanged { - // Nothing has changed - return nil - } - // Let's update the spec - current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.Namespace).Get(d.apiObject.Name, metav1.GetOptions{}) - if err != nil { - log.Debug().Err(err).Msg("Failed to get current deployment") - return maskAny(err) - } - if coordinatorsChanged { - current.Spec.Coordinators.Count = req.GetCoordinators() - } - if dbserversChanged { - current.Spec.DBServers.Count = req.GetDBServers() - } - if err := d.updateCRSpec(current.Spec); err != nil { - log.Warn().Err(err).Msg("Failed to update current deployment") - return maskAny(err) - } - return nil -} diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go new file mode 100644 index 000000000..b8e39bde9 --- /dev/null +++ b/pkg/deployment/cluster_scaling_integration.go @@ -0,0 +1,188 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + "sync" + "time" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + "github.com/rs/zerolog" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// clusterScalingIntegration is a helper to communicate with the clusters +// scaling UI. +type clusterScalingIntegration struct { + log zerolog.Logger + depl *Deployment + pendingUpdate struct { + mutex sync.Mutex + spec *api.DeploymentSpec + } + lastNumberOfServers struct { + arangod.NumberOfServers + mutex sync.Mutex + } +} + +// newClusterScalingIntegration creates a new clusterScalingIntegration. +func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration { + return &clusterScalingIntegration{ + log: depl.deps.Log, + depl: depl, + } +} + +// SendUpdateToCluster records the given spec to be sended to the cluster. +func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) { + ci.pendingUpdate.mutex.Lock() + defer ci.pendingUpdate.mutex.Unlock() + ci.pendingUpdate.spec = &spec +} + +// listenForClusterEvents keep listening for changes entered in the UI of the cluster. +func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) { + for { + delay := time.Second * 2 + + // Is deployment in running state + if ci.depl.status.State == api.DeploymentStateRunning { + // Update cluster with our state + ctx := context.Background() + safeToAskCluster, err := ci.updateClusterServerCount(ctx) + if err != nil { + ci.log.Debug().Err(err).Msg("Cluster update failed") + } else if safeToAskCluster { + // Inspect once + if err := ci.inspectCluster(ctx); err != nil { + ci.log.Debug().Err(err).Msg("Cluster inspection failed") + } + } + } + + select { + case <-time.After(delay): + // Continue + case <-stopCh: + // We're done + return + } + } +} + +// Perform a single inspection of the cluster +func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context) error { + log := ci.log + c, err := ci.depl.clientCache.GetDatabase(ctx) + if err != nil { + return maskAny(err) + } + req, err := arangod.GetNumberOfServers(ctx, c.Connection()) + if err != nil { + log.Debug().Err(err).Msg("Failed to get number of servers") + return maskAny(err) + } + if req.Coordinators == nil && req.DBServers == nil { + // Nothing to check + return nil + } + coordinatorsChanged := false + dbserversChanged := false + ci.lastNumberOfServers.mutex.Lock() + defer ci.lastNumberOfServers.mutex.Unlock() + desired := ci.lastNumberOfServers.NumberOfServers + if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { + // #Coordinator has changed + coordinatorsChanged = true + } + if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { + // #DBServers has changed + dbserversChanged = true + } + if !coordinatorsChanged && !dbserversChanged { + // Nothing has changed + return nil + } + // Let's update the spec + apiObject := ci.depl.apiObject + current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(apiObject.Namespace).Get(apiObject.Name, metav1.GetOptions{}) + if err != nil { + log.Debug().Err(err).Msg("Failed to get current deployment") + return maskAny(err) + } + if coordinatorsChanged { + current.Spec.Coordinators.Count = req.GetCoordinators() + } + if dbserversChanged { + current.Spec.DBServers.Count = req.GetDBServers() + } + if err := ci.depl.updateCRSpec(current.Spec); err != nil { + log.Warn().Err(err).Msg("Failed to update current deployment") + return maskAny(err) + } + return nil +} + +// updateClusterServerCount updates the intended number of servers of the cluster. +// Returns true when it is safe to ask the cluster for updates. +func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context) (bool, error) { + // Any update needed? + ci.pendingUpdate.mutex.Lock() + spec := ci.pendingUpdate.spec + ci.pendingUpdate.mutex.Unlock() + if spec == nil { + // Nothing pending + return true, nil + } + + log := ci.log + c, err := ci.depl.clientCache.GetDatabase(ctx) + if err != nil { + return false, maskAny(err) + } + coordinatorCount := spec.Coordinators.Count + dbserverCount := spec.DBServers.Count + if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil { + log.Debug().Err(err).Msg("Failed to set number of servers") + return false, maskAny(err) + } + + // Success, now update internal state + safeToAskCluster := false + ci.pendingUpdate.mutex.Lock() + if spec == ci.pendingUpdate.spec { + ci.pendingUpdate.spec = nil + safeToAskCluster = true + } + ci.pendingUpdate.mutex.Unlock() + + ci.lastNumberOfServers.mutex.Lock() + defer ci.lastNumberOfServers.mutex.Unlock() + + ci.lastNumberOfServers.Coordinators = &coordinatorCount + ci.lastNumberOfServers.DBServers = &dbserverCount + return safeToAskCluster, nil +} diff --git a/pkg/deployment/cluster_updater.go b/pkg/deployment/cluster_updater.go deleted file mode 100644 index bf6d1a599..000000000 --- a/pkg/deployment/cluster_updater.go +++ /dev/null @@ -1,50 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2018 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// -// Author Ewout Prangsma -// - -package deployment - -import ( - "context" - - "github.com/arangodb/kube-arangodb/pkg/util/arangod" -) - -// updateClusterServerCount updates the intended number of servers of the cluster. -func (d *Deployment) updateClusterServerCount(ctx context.Context) error { - log := d.deps.Log - c, err := d.clientCache.GetDatabase(ctx) - if err != nil { - return maskAny(err) - } - spec := d.apiObject.Spec - coordinatorCount := spec.Coordinators.Count - dbserverCount := spec.DBServers.Count - if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil { - log.Debug().Err(err).Msg("Failed to set number of servers") - return maskAny(err) - } - d.lastNumberOfServers.mutex.Lock() - defer d.lastNumberOfServers.mutex.Unlock() - d.lastNumberOfServers.Coordinators = &coordinatorCount - d.lastNumberOfServers.DBServers = &dbserverCount - return nil -} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 4ec66e7ee..e842964b4 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -23,10 +23,8 @@ package deployment import ( - "context" "fmt" "reflect" - "sync" "time" "github.com/rs/zerolog" @@ -38,7 +36,6 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" - "github.com/arangodb/kube-arangodb/pkg/util/arangod" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/retry" "github.com/arangodb/kube-arangodb/pkg/util/trigger" @@ -91,13 +88,10 @@ type Deployment struct { eventsCli corev1.EventInterface - inspectTrigger trigger.Trigger - clientCache *clientCache - recentInspectionErrors int - lastNumberOfServers struct { - arangod.NumberOfServers - mutex sync.Mutex - } + inspectTrigger trigger.Trigger + clientCache *clientCache + recentInspectionErrors int + clusterScalingIntegration *clusterScalingIntegration } // New creates a new Deployment from the given API object. @@ -119,7 +113,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De go d.run() go d.listenForPodEvents() if apiObject.Spec.Mode == api.DeploymentModeCluster { - go d.listenForClusterEvents(d.stopCh) + ci := newClusterScalingIntegration(d) + d.clusterScalingIntegration = ci + go ci.ListenForClusterEvents(d.stopCh) } return d, nil @@ -280,11 +276,8 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent) } // Notify cluster of desired server count - if d.apiObject.Spec.Mode == api.DeploymentModeCluster { - ctx := context.Background() - if err := d.updateClusterServerCount(ctx); err != nil { - log.Error().Err(err).Msg("Failed to update desired server count in cluster") - } + if ci := d.clusterScalingIntegration; ci != nil { + ci.SendUpdateToCluster(d.apiObject.Spec) } // Trigger inspect