Skip to content

Commit

Permalink
Add scale up/down controls on deployments, replica sets, and replicat…
Browse files Browse the repository at this point in the history
…ion controllers (#1451)
  • Loading branch information
paulbellamy authored and tomwilkie committed May 10, 2016
1 parent b624df6 commit 291c9af
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 51 deletions.
69 changes: 49 additions & 20 deletions probe/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,22 @@ type Client interface {
WalkServices(f func(Service) error) error
WalkDeployments(f func(Deployment) error) error
WalkReplicaSets(f func(ReplicaSet) error) error
WalkReplicationControllers(f func(ReplicationController) error) error
WalkNodes(f func(*api.Node) error) error

WatchPods(f func(Event, Pod))

GetLogs(namespaceID, podID string) (io.ReadCloser, error)
DeletePod(namespaceID, podID string) error
ScaleUp(resource, namespaceID, id string) error
ScaleDown(resource, namespaceID, id string) error
}

type client struct {
quit chan struct{}
resyncPeriod time.Duration
client *unversioned.Client
extensionsClient *unversioned.ExtensionsClient
podStore *cache.StoreToPodLister
serviceStore *cache.StoreToServiceLister
deploymentStore *cache.StoreToDeploymentLister
Expand Down Expand Up @@ -85,9 +89,10 @@ func NewClient(addr string, resyncPeriod time.Duration) (Client, error) {
}

result := &client{
quit: make(chan struct{}),
resyncPeriod: resyncPeriod,
client: c,
quit: make(chan struct{}),
resyncPeriod: resyncPeriod,
client: c,
extensionsClient: ec,
}

result.podStore = &cache.StoreToPodLister{Store: result.setupStore(c, "pods", &api.Pod{})}
Expand Down Expand Up @@ -159,30 +164,31 @@ func (c *client) WalkDeployments(f func(Deployment) error) error {
return nil
}

// WalkReplicaSets calls f for each replica set (and replication controller)
// WalkReplicaSets calls f for each replica set
func (c *client) WalkReplicaSets(f func(ReplicaSet) error) error {
{
list, err := c.replicaSetStore.List()
if err != nil {
list, err := c.replicaSetStore.List()
if err != nil {
return err
}
for i := range list {
if err := f(NewReplicaSet(&(list[i]))); err != nil {
return err
}
for i := range list {
if err := f(NewReplicaSet(&(list[i]))); err != nil {
return err
}
}
}
return nil

{
list, err := c.replicationControllerStore.List()
if err != nil {
}

// WalkReplicationcontrollers calls f for each replication controller
func (c *client) WalkReplicationControllers(f func(ReplicationController) error) error {
list, err := c.replicationControllerStore.List()
if err != nil {
return err
}
for i := range list {
if err := f(NewReplicationController(&(list[i]))); err != nil {
return err
}
for i := range list {
if err := f(NewReplicationController(&(list[i]))); err != nil {
return err
}
}
}
return nil
}
Expand Down Expand Up @@ -219,6 +225,29 @@ func (c *client) DeletePod(namespaceID, podID string) error {
Resource("pods").Do().Error()
}

func (c *client) ScaleUp(resource, namespaceID, id string) error {
return c.modifyScale(resource, namespaceID, id, func(scale *extensions.Scale) {
scale.Spec.Replicas++
})
}

func (c *client) ScaleDown(resource, namespaceID, id string) error {
return c.modifyScale(resource, namespaceID, id, func(scale *extensions.Scale) {
scale.Spec.Replicas--
})
}

func (c *client) modifyScale(resource, namespace, id string, f func(*extensions.Scale)) error {
scaler := c.extensionsClient.Scales(namespace)
scale, err := scaler.Get(resource, id)
if err != nil {
return err
}
f(scale)
_, err = scaler.Update(resource, scale)
return err
}

func (c *client) Stop() {
close(c.quit)
}
75 changes: 75 additions & 0 deletions probe/kubernetes/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
const (
GetLogs = "kubernetes_get_logs"
DeletePod = "kubernetes_delete_pod"
ScaleUp = "kubernetes_scale_up"
ScaleDown = "kubernetes_scale_down"
)

// GetLogs is the control to get the logs for a kubernetes pod
Expand Down Expand Up @@ -72,12 +74,85 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response
}
}

// CaptureResource is exported for testing
func (r *Reporter) CaptureResource(f func(xfer.Request, string, string, string) xfer.Response) func(xfer.Request) xfer.Response {
return func(req xfer.Request) xfer.Response {
var resource, uid string
for _, parser := range []struct {
res string
f func(string) (string, bool)
}{
{report.Deployment, report.ParseDeploymentNodeID},
{report.ReplicaSet, report.ParseReplicaSetNodeID},
} {
if u, ok := parser.f(req.NodeID); ok {
resource, uid = parser.res, u
break
}
}
if resource == "" {
return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID)
}

switch resource {
case report.Deployment:
var deployment Deployment
r.client.WalkDeployments(func(d Deployment) error {
if d.UID() == uid {
deployment = d
}
return nil
})
if deployment != nil {
return f(req, "deployment", deployment.Namespace(), deployment.Name())
}
case report.ReplicaSet:
var replicaSet ReplicaSet
var res string
r.client.WalkReplicaSets(func(r ReplicaSet) error {
if r.UID() == uid {
replicaSet = r
res = "replicaset"
}
return nil
})
if replicaSet == nil {
r.client.WalkReplicationControllers(func(r ReplicationController) error {
if r.UID() == uid {
replicaSet = ReplicaSet(r)
res = "replicationcontroller"
}
return nil
})
}
if replicaSet != nil {
return f(req, res, replicaSet.Namespace(), replicaSet.Name())
}
}
return xfer.ResponseErrorf("%s not found: %s", resource, uid)
}
}

// ScaleUp is the control to scale up a deployment
func (r *Reporter) ScaleUp(req xfer.Request, resource, namespace, id string) xfer.Response {
return xfer.ResponseError(r.client.ScaleUp(resource, namespace, id))
}

// ScaleDown is the control to scale up a deployment
func (r *Reporter) ScaleDown(req xfer.Request, resource, namespace, id string) xfer.Response {
return xfer.ResponseError(r.client.ScaleDown(resource, namespace, id))
}

func (r *Reporter) registerControls() {
controls.Register(GetLogs, r.CapturePod(r.GetLogs))
controls.Register(DeletePod, r.CapturePod(r.deletePod))
controls.Register(ScaleUp, r.CaptureResource(r.ScaleUp))
controls.Register(ScaleDown, r.CaptureResource(r.ScaleDown))
}

func (r *Reporter) deregisterControls() {
controls.Rm(GetLogs)
controls.Rm(DeletePod)
controls.Rm(ScaleUp)
controls.Rm(ScaleDown)
}
2 changes: 1 addition & 1 deletion probe/kubernetes/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func (d *deployment) GetNode(probeID string) report.Node {
UnavailableReplicas: fmt.Sprint(d.Status.UnavailableReplicas),
Strategy: string(d.Spec.Strategy.Type),
report.ControlProbeID: probeID,
})
}).WithControls(ScaleUp, ScaleDown)
}
2 changes: 1 addition & 1 deletion probe/kubernetes/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ func (r *replicaSet) GetNode(probeID string) report.Node {
DesiredReplicas: fmt.Sprint(r.Spec.Replicas),
FullyLabeledReplicas: fmt.Sprint(r.Status.FullyLabeledReplicas),
report.ControlProbeID: probeID,
}).WithParents(r.parents)
}).WithParents(r.parents).WithControls(ScaleUp, ScaleDown)
}
12 changes: 10 additions & 2 deletions probe/kubernetes/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
"k8s.io/kubernetes/pkg/labels"
)

// ReplicationController represents a Kubernetes replication controller
type ReplicationController interface {
Meta
Selector() labels.Selector
AddParent(topology, id string)
GetNode(probeID string) report.Node
}

type replicationController struct {
*api.ReplicationController
Meta
Expand All @@ -16,7 +24,7 @@ type replicationController struct {
}

// NewReplicationController creates a new ReplicationController
func NewReplicationController(r *api.ReplicationController) ReplicaSet {
func NewReplicationController(r *api.ReplicationController) ReplicationController {
return &replicationController{
ReplicationController: r,
Meta: meta{r.ObjectMeta},
Expand All @@ -42,5 +50,5 @@ func (r *replicationController) GetNode(probeID string) report.Node {
DesiredReplicas: fmt.Sprint(r.Spec.Replicas),
FullyLabeledReplicas: fmt.Sprint(r.Status.FullyLabeledReplicas),
report.ControlProbeID: probeID,
}).WithParents(r.parents)
}).WithParents(r.parents).WithControls(ScaleUp, ScaleDown)
}
31 changes: 31 additions & 0 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ var (
TableTemplates = report.TableTemplates{
LabelPrefix: {ID: LabelPrefix, Label: "Kubernetes Labels", Prefix: LabelPrefix},
}

ScalingControls = []report.Control{
{
ID: ScaleDown,
Human: "Scale Down",
Icon: "fa-minus",
Rank: 0,
},
{
ID: ScaleUp,
Human: "Scale Up",
Icon: "fa-plus",
Rank: 1,
},
}
)

// Reporter generate Reports containing Container and ContainerImage topologies
Expand Down Expand Up @@ -206,6 +221,8 @@ func (r *Reporter) deploymentTopology(probeID string) (report.Topology, []Deploy
WithTableTemplates(TableTemplates)
deployments = []Deployment{}
)
result.Controls.AddControls(ScalingControls)

err := r.client.WalkDeployments(func(d Deployment) error {
result = result.AddNode(d.GetNode(probeID))
deployments = append(deployments, d)
Expand All @@ -222,6 +239,8 @@ func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment)
replicaSets = []ReplicaSet{}
selectors = []func(labelledChild){}
)
result.Controls.AddControls(ScalingControls)

for _, deployment := range deployments {
selectors = append(selectors, match(
deployment.Selector(),
Expand All @@ -238,6 +257,18 @@ func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment)
replicaSets = append(replicaSets, r)
return nil
})
if err != nil {
return result, replicaSets, err
}

err = r.client.WalkReplicationControllers(func(r ReplicationController) error {
for _, selector := range selectors {
selector(r)
}
result = result.AddNode(r.GetNode(probeID))
replicaSets = append(replicaSets, ReplicaSet(r))
return nil
})
return result, replicaSets, err
}

Expand Down
9 changes: 9 additions & 0 deletions probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (c *mockClient) WalkDeployments(f func(kubernetes.Deployment) error) error
func (c *mockClient) WalkReplicaSets(f func(kubernetes.ReplicaSet) error) error {
return nil
}
func (c *mockClient) WalkReplicationControllers(f func(kubernetes.ReplicationController) error) error {
return nil
}
func (*mockClient) WalkNodes(f func(*api.Node) error) error {
return nil
}
Expand All @@ -151,6 +154,12 @@ func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error)
func (c *mockClient) DeletePod(namespaceID, podID string) error {
return nil
}
func (c *mockClient) ScaleUp(resource, namespaceID, id string) error {
return nil
}
func (c *mockClient) ScaleDown(resource, namespaceID, id string) error {
return nil
}

type mockPipeClient map[string]xfer.Pipe

Expand Down
Loading

0 comments on commit 291c9af

Please sign in to comment.