diff --git a/pkg/deployment/cluster_informer.go b/pkg/deployment/cluster_informer.go new file mode 100644 index 000000000..e0f4c95a5 --- /dev/null +++ b/pkg/deployment/cluster_informer.go @@ -0,0 +1,104 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + "time" + + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// listenForClusterEvents keep listening for changes entered in the UI of the cluster. +func (d *Deployment) listenForClusterEvents(stopCh <-chan struct{}) { + for { + delay := time.Second * 2 + + // Inspect once + ctx := context.Background() + if err := d.inspectCluster(ctx); err != nil { + d.deps.Log.Debug().Err(err).Msg("Cluster inspection failed") + } + + select { + case <-time.After(delay): + // Continue + case <-stopCh: + // We're done + return + } + } +} + +// Perform a single inspection of the cluster +func (d *Deployment) inspectCluster(ctx context.Context) error { + log := d.deps.Log + c, err := d.clientCache.GetDatabase(ctx) + if err != nil { + return maskAny(err) + } + req, err := arangod.GetNumberOfServers(ctx, c.Connection()) + if err != nil { + log.Debug().Err(err).Msg("Failed to get number of servers") + return maskAny(err) + } + if req.Coordinators == nil && req.DBServers == nil { + // Nothing to check + return nil + } + coordinatorsChanged := false + dbserversChanged := false + d.lastNumberOfServers.mutex.Lock() + defer d.lastNumberOfServers.mutex.Unlock() + desired := d.lastNumberOfServers.NumberOfServers + if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() { + // #Coordinator has changed + coordinatorsChanged = true + } + if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() { + // #DBServers has changed + dbserversChanged = true + } + if !coordinatorsChanged && !dbserversChanged { + // Nothing has changed + return nil + } + // Let's update the spec + current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.Namespace).Get(d.apiObject.Name, metav1.GetOptions{}) + if err != nil { + log.Debug().Err(err).Msg("Failed to get current deployment") + return maskAny(err) + } + if coordinatorsChanged { + current.Spec.Coordinators.Count = req.GetCoordinators() + } + if dbserversChanged { + current.Spec.DBServers.Count = req.GetDBServers() + } + if err := d.updateCRSpec(current.Spec); err != nil { + log.Warn().Err(err).Msg("Failed to update current deployment") + return maskAny(err) + } + return nil +} diff --git a/pkg/deployment/cluster_updater.go b/pkg/deployment/cluster_updater.go new file mode 100644 index 000000000..bf6d1a599 --- /dev/null +++ b/pkg/deployment/cluster_updater.go @@ -0,0 +1,50 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + + "github.com/arangodb/kube-arangodb/pkg/util/arangod" +) + +// updateClusterServerCount updates the intended number of servers of the cluster. +func (d *Deployment) updateClusterServerCount(ctx context.Context) error { + log := d.deps.Log + c, err := d.clientCache.GetDatabase(ctx) + if err != nil { + return maskAny(err) + } + spec := d.apiObject.Spec + coordinatorCount := spec.Coordinators.Count + dbserverCount := spec.DBServers.Count + if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil { + log.Debug().Err(err).Msg("Failed to set number of servers") + return maskAny(err) + } + d.lastNumberOfServers.mutex.Lock() + defer d.lastNumberOfServers.mutex.Unlock() + d.lastNumberOfServers.Coordinators = &coordinatorCount + d.lastNumberOfServers.DBServers = &dbserverCount + return nil +} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 328977043..4ec66e7ee 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -23,8 +23,10 @@ package deployment import ( + "context" "fmt" "reflect" + "sync" "time" "github.com/rs/zerolog" @@ -36,6 +38,7 @@ import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/retry" "github.com/arangodb/kube-arangodb/pkg/util/trigger" @@ -89,8 +92,12 @@ type Deployment struct { eventsCli corev1.EventInterface inspectTrigger trigger.Trigger - recentInspectionErrors int clientCache *clientCache + recentInspectionErrors int + lastNumberOfServers struct { + arangod.NumberOfServers + mutex sync.Mutex + } } // New creates a new Deployment from the given API object. @@ -111,6 +118,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De go d.run() go d.listenForPodEvents() + if apiObject.Spec.Mode == api.DeploymentModeCluster { + go d.listenForClusterEvents(d.stopCh) + } return d, nil } @@ -269,6 +279,14 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent) return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err)) } + // Notify cluster of desired server count + if d.apiObject.Spec.Mode == api.DeploymentModeCluster { + ctx := context.Background() + if err := d.updateClusterServerCount(ctx); err != nil { + log.Error().Err(err).Msg("Failed to update desired server count in cluster") + } + } + // Trigger inspect d.inspectTrigger.Trigger() diff --git a/pkg/util/arangod/cluster.go b/pkg/util/arangod/cluster.go new file mode 100644 index 000000000..a75601499 --- /dev/null +++ b/pkg/util/arangod/cluster.go @@ -0,0 +1,94 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + "context" + + driver "github.com/arangodb/go-driver" +) + +// NumberOfServers is the JSON structure return for the numberOfServers API call. +type NumberOfServers struct { + Coordinators *int `json:"numberOfCoordinators,omitempty"` + DBServers *int `json:"numberOfDBServers,omitempty"` +} + +// GetCoordinators returns Coordinators if not nil, otherwise 0. +func (n NumberOfServers) GetCoordinators() int { + if n.Coordinators != nil { + return *n.Coordinators + } + return 0 +} + +// GetDBServers returns DBServers if not nil, otherwise 0. +func (n NumberOfServers) GetDBServers() int { + if n.DBServers != nil { + return *n.DBServers + } + return 0 +} + +// GetNumberOfServers fetches the number of servers the cluster wants to have. +func GetNumberOfServers(ctx context.Context, conn driver.Connection) (NumberOfServers, error) { + req, err := conn.NewRequest("GET", "_admin/cluster/numberOfServers") + if err != nil { + return NumberOfServers{}, maskAny(err) + } + resp, err := conn.Do(ctx, req) + if err != nil { + return NumberOfServers{}, maskAny(err) + } + if err := resp.CheckStatus(200); err != nil { + return NumberOfServers{}, maskAny(err) + } + var result NumberOfServers + if err := resp.ParseBody("", &result); err != nil { + return NumberOfServers{}, maskAny(err) + } + return result, nil +} + +// SetNumberOfServers updates the number of servers the cluster has. +func SetNumberOfServers(ctx context.Context, conn driver.Connection, noCoordinators, noDBServers int) error { + req, err := conn.NewRequest("PUT", "_admin/cluster/numberOfServers") + if err != nil { + return maskAny(err) + } + input := NumberOfServers{ + Coordinators: &noCoordinators, + DBServers: &noDBServers, + } + if _, err := req.SetBody(input); err != nil { + return maskAny(err) + } + resp, err := conn.Do(ctx, req) + if err != nil { + return maskAny(err) + } + if err := resp.CheckStatus(200); err != nil { + return maskAny(err) + } + return nil +}