Skip to content

Commit

Permalink
Merge pull request #65 from arangodb/ui-integration
Browse files Browse the repository at this point in the history
Integrate with scaling web-UI
  • Loading branch information
ewoutp committed Mar 23, 2018
2 parents 8b01a8a + 7865248 commit da9e477
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 1 deletion.
104 changes: 104 additions & 0 deletions pkg/deployment/cluster_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

import (
"context"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/arangod"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
func (d *Deployment) listenForClusterEvents(stopCh <-chan struct{}) {
for {
delay := time.Second * 2

// Inspect once
ctx := context.Background()
if err := d.inspectCluster(ctx); err != nil {
d.deps.Log.Debug().Err(err).Msg("Cluster inspection failed")
}

select {
case <-time.After(delay):
// Continue
case <-stopCh:
// We're done
return
}
}
}

// Perform a single inspection of the cluster
func (d *Deployment) inspectCluster(ctx context.Context) error {
log := d.deps.Log
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
}
req, err := arangod.GetNumberOfServers(ctx, c.Connection())
if err != nil {
log.Debug().Err(err).Msg("Failed to get number of servers")
return maskAny(err)
}
if req.Coordinators == nil && req.DBServers == nil {
// Nothing to check
return nil
}
coordinatorsChanged := false
dbserversChanged := false
d.lastNumberOfServers.mutex.Lock()
defer d.lastNumberOfServers.mutex.Unlock()
desired := d.lastNumberOfServers.NumberOfServers
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
// #Coordinator has changed
coordinatorsChanged = true
}
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
// #DBServers has changed
dbserversChanged = true
}
if !coordinatorsChanged && !dbserversChanged {
// Nothing has changed
return nil
}
// Let's update the spec
current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.Namespace).Get(d.apiObject.Name, metav1.GetOptions{})
if err != nil {
log.Debug().Err(err).Msg("Failed to get current deployment")
return maskAny(err)
}
if coordinatorsChanged {
current.Spec.Coordinators.Count = req.GetCoordinators()
}
if dbserversChanged {
current.Spec.DBServers.Count = req.GetDBServers()
}
if err := d.updateCRSpec(current.Spec); err != nil {
log.Warn().Err(err).Msg("Failed to update current deployment")
return maskAny(err)
}
return nil
}
50 changes: 50 additions & 0 deletions pkg/deployment/cluster_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

import (
"context"

"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)

// updateClusterServerCount updates the intended number of servers of the cluster.
func (d *Deployment) updateClusterServerCount(ctx context.Context) error {
log := d.deps.Log
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
}
spec := d.apiObject.Spec
coordinatorCount := spec.Coordinators.Count
dbserverCount := spec.DBServers.Count
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
log.Debug().Err(err).Msg("Failed to set number of servers")
return maskAny(err)
}
d.lastNumberOfServers.mutex.Lock()
defer d.lastNumberOfServers.mutex.Unlock()
d.lastNumberOfServers.Coordinators = &coordinatorCount
d.lastNumberOfServers.DBServers = &dbserverCount
return nil
}
20 changes: 19 additions & 1 deletion pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
package deployment

import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -36,6 +38,7 @@ import (

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
Expand Down Expand Up @@ -89,8 +92,12 @@ type Deployment struct {
eventsCli corev1.EventInterface

inspectTrigger trigger.Trigger
recentInspectionErrors int
clientCache *clientCache
recentInspectionErrors int
lastNumberOfServers struct {
arangod.NumberOfServers
mutex sync.Mutex
}
}

// New creates a new Deployment from the given API object.
Expand All @@ -111,6 +118,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De

go d.run()
go d.listenForPodEvents()
if apiObject.Spec.Mode == api.DeploymentModeCluster {
go d.listenForClusterEvents(d.stopCh)
}

return d, nil
}
Expand Down Expand Up @@ -269,6 +279,14 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
}

// Notify cluster of desired server count
if d.apiObject.Spec.Mode == api.DeploymentModeCluster {
ctx := context.Background()
if err := d.updateClusterServerCount(ctx); err != nil {
log.Error().Err(err).Msg("Failed to update desired server count in cluster")
}
}

// Trigger inspect
d.inspectTrigger.Trigger()

Expand Down
94 changes: 94 additions & 0 deletions pkg/util/arangod/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package arangod

import (
"context"

driver "github.com/arangodb/go-driver"
)

// NumberOfServers is the JSON structure return for the numberOfServers API call.
type NumberOfServers struct {
Coordinators *int `json:"numberOfCoordinators,omitempty"`
DBServers *int `json:"numberOfDBServers,omitempty"`
}

// GetCoordinators returns Coordinators if not nil, otherwise 0.
func (n NumberOfServers) GetCoordinators() int {
if n.Coordinators != nil {
return *n.Coordinators
}
return 0
}

// GetDBServers returns DBServers if not nil, otherwise 0.
func (n NumberOfServers) GetDBServers() int {
if n.DBServers != nil {
return *n.DBServers
}
return 0
}

// GetNumberOfServers fetches the number of servers the cluster wants to have.
func GetNumberOfServers(ctx context.Context, conn driver.Connection) (NumberOfServers, error) {
req, err := conn.NewRequest("GET", "_admin/cluster/numberOfServers")
if err != nil {
return NumberOfServers{}, maskAny(err)
}
resp, err := conn.Do(ctx, req)
if err != nil {
return NumberOfServers{}, maskAny(err)
}
if err := resp.CheckStatus(200); err != nil {
return NumberOfServers{}, maskAny(err)
}
var result NumberOfServers
if err := resp.ParseBody("", &result); err != nil {
return NumberOfServers{}, maskAny(err)
}
return result, nil
}

// SetNumberOfServers updates the number of servers the cluster has.
func SetNumberOfServers(ctx context.Context, conn driver.Connection, noCoordinators, noDBServers int) error {
req, err := conn.NewRequest("PUT", "_admin/cluster/numberOfServers")
if err != nil {
return maskAny(err)
}
input := NumberOfServers{
Coordinators: &noCoordinators,
DBServers: &noDBServers,
}
if _, err := req.SetBody(input); err != nil {
return maskAny(err)
}
resp, err := conn.Do(ctx, req)
if err != nil {
return maskAny(err)
}
if err := resp.CheckStatus(200); err != nil {
return maskAny(err)
}
return nil
}

0 comments on commit da9e477

Please sign in to comment.