diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 76ec6142c3..8aee202d9a 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -24,18 +24,22 @@ type Client interface { WalkServices(f func(Service) error) error WalkDeployments(f func(Deployment) error) error WalkReplicaSets(f func(ReplicaSet) error) error + WalkReplicationControllers(f func(ReplicationController) error) error WalkNodes(f func(*api.Node) error) error WatchPods(f func(Event, Pod)) GetLogs(namespaceID, podID string) (io.ReadCloser, error) DeletePod(namespaceID, podID string) error + ScaleUp(resource, namespaceID, id string) error + ScaleDown(resource, namespaceID, id string) error } type client struct { quit chan struct{} resyncPeriod time.Duration client *unversioned.Client + extensionsClient *unversioned.ExtensionsClient podStore *cache.StoreToPodLister serviceStore *cache.StoreToServiceLister deploymentStore *cache.StoreToDeploymentLister @@ -85,9 +89,10 @@ func NewClient(addr string, resyncPeriod time.Duration) (Client, error) { } result := &client{ - quit: make(chan struct{}), - resyncPeriod: resyncPeriod, - client: c, + quit: make(chan struct{}), + resyncPeriod: resyncPeriod, + client: c, + extensionsClient: ec, } result.podStore = &cache.StoreToPodLister{Store: result.setupStore(c, "pods", &api.Pod{})} @@ -159,30 +164,31 @@ func (c *client) WalkDeployments(f func(Deployment) error) error { return nil } -// WalkReplicaSets calls f for each replica set (and replication controller) +// WalkReplicaSets calls f for each replica set func (c *client) WalkReplicaSets(f func(ReplicaSet) error) error { - { - list, err := c.replicaSetStore.List() - if err != nil { + list, err := c.replicaSetStore.List() + if err != nil { + return err + } + for i := range list { + if err := f(NewReplicaSet(&(list[i]))); err != nil { return err } - for i := range list { - if err := f(NewReplicaSet(&(list[i]))); err != nil { - return err - } - } } + return nil - { - list, err := c.replicationControllerStore.List() - if err != nil { +} + +// WalkReplicationcontrollers calls f for each replication controller +func (c *client) WalkReplicationControllers(f func(ReplicationController) error) error { + list, err := c.replicationControllerStore.List() + if err != nil { + return err + } + for i := range list { + if err := f(NewReplicationController(&(list[i]))); err != nil { return err } - for i := range list { - if err := f(NewReplicationController(&(list[i]))); err != nil { - return err - } - } } return nil } @@ -219,6 +225,29 @@ func (c *client) DeletePod(namespaceID, podID string) error { Resource("pods").Do().Error() } +func (c *client) ScaleUp(resource, namespaceID, id string) error { + return c.modifyScale(resource, namespaceID, id, func(scale *extensions.Scale) { + scale.Spec.Replicas++ + }) +} + +func (c *client) ScaleDown(resource, namespaceID, id string) error { + return c.modifyScale(resource, namespaceID, id, func(scale *extensions.Scale) { + scale.Spec.Replicas-- + }) +} + +func (c *client) modifyScale(resource, namespace, id string, f func(*extensions.Scale)) error { + scaler := c.extensionsClient.Scales(namespace) + scale, err := scaler.Get(resource, id) + if err != nil { + return err + } + f(scale) + _, err = scaler.Update(resource, scale) + return err +} + func (c *client) Stop() { close(c.quit) } diff --git a/probe/kubernetes/controls.go b/probe/kubernetes/controls.go index a7c40349c2..9eab252c9a 100644 --- a/probe/kubernetes/controls.go +++ b/probe/kubernetes/controls.go @@ -13,6 +13,8 @@ import ( const ( GetLogs = "kubernetes_get_logs" DeletePod = "kubernetes_delete_pod" + ScaleUp = "kubernetes_scale_up" + ScaleDown = "kubernetes_scale_down" ) // GetLogs is the control to get the logs for a kubernetes pod @@ -72,12 +74,85 @@ func (r *Reporter) CapturePod(f func(xfer.Request, string, string) xfer.Response } } +// CaptureResource is exported for testing +func (r *Reporter) CaptureResource(f func(xfer.Request, string, string, string) xfer.Response) func(xfer.Request) xfer.Response { + return func(req xfer.Request) xfer.Response { + var resource, uid string + for _, parser := range []struct { + res string + f func(string) (string, bool) + }{ + {report.Deployment, report.ParseDeploymentNodeID}, + {report.ReplicaSet, report.ParseReplicaSetNodeID}, + } { + if u, ok := parser.f(req.NodeID); ok { + resource, uid = parser.res, u + break + } + } + if resource == "" { + return xfer.ResponseErrorf("Invalid ID: %s", req.NodeID) + } + + switch resource { + case report.Deployment: + var deployment Deployment + r.client.WalkDeployments(func(d Deployment) error { + if d.UID() == uid { + deployment = d + } + return nil + }) + if deployment != nil { + return f(req, "deployment", deployment.Namespace(), deployment.Name()) + } + case report.ReplicaSet: + var replicaSet ReplicaSet + var res string + r.client.WalkReplicaSets(func(r ReplicaSet) error { + if r.UID() == uid { + replicaSet = r + res = "replicaset" + } + return nil + }) + if replicaSet == nil { + r.client.WalkReplicationControllers(func(r ReplicationController) error { + if r.UID() == uid { + replicaSet = ReplicaSet(r) + res = "replicationcontroller" + } + return nil + }) + } + if replicaSet != nil { + return f(req, res, replicaSet.Namespace(), replicaSet.Name()) + } + } + return xfer.ResponseErrorf("%s not found: %s", resource, uid) + } +} + +// ScaleUp is the control to scale up a deployment +func (r *Reporter) ScaleUp(req xfer.Request, resource, namespace, id string) xfer.Response { + return xfer.ResponseError(r.client.ScaleUp(resource, namespace, id)) +} + +// ScaleDown is the control to scale up a deployment +func (r *Reporter) ScaleDown(req xfer.Request, resource, namespace, id string) xfer.Response { + return xfer.ResponseError(r.client.ScaleDown(resource, namespace, id)) +} + func (r *Reporter) registerControls() { controls.Register(GetLogs, r.CapturePod(r.GetLogs)) controls.Register(DeletePod, r.CapturePod(r.deletePod)) + controls.Register(ScaleUp, r.CaptureResource(r.ScaleUp)) + controls.Register(ScaleDown, r.CaptureResource(r.ScaleDown)) } func (r *Reporter) deregisterControls() { controls.Rm(GetLogs) controls.Rm(DeletePod) + controls.Rm(ScaleUp) + controls.Rm(ScaleDown) } diff --git a/probe/kubernetes/deployment.go b/probe/kubernetes/deployment.go index 513e3a3a8b..965b812f3c 100644 --- a/probe/kubernetes/deployment.go +++ b/probe/kubernetes/deployment.go @@ -55,5 +55,5 @@ func (d *deployment) GetNode(probeID string) report.Node { UnavailableReplicas: fmt.Sprint(d.Status.UnavailableReplicas), Strategy: string(d.Spec.Strategy.Type), report.ControlProbeID: probeID, - }) + }).WithControls(ScaleUp, ScaleDown) } diff --git a/probe/kubernetes/replica_set.go b/probe/kubernetes/replica_set.go index 182b3d6659..eafbebe693 100644 --- a/probe/kubernetes/replica_set.go +++ b/probe/kubernetes/replica_set.go @@ -59,5 +59,5 @@ func (r *replicaSet) GetNode(probeID string) report.Node { DesiredReplicas: fmt.Sprint(r.Spec.Replicas), FullyLabeledReplicas: fmt.Sprint(r.Status.FullyLabeledReplicas), report.ControlProbeID: probeID, - }).WithParents(r.parents) + }).WithParents(r.parents).WithControls(ScaleUp, ScaleDown) } diff --git a/probe/kubernetes/replication_controller.go b/probe/kubernetes/replication_controller.go index 09bd32eed7..14350f5a6e 100644 --- a/probe/kubernetes/replication_controller.go +++ b/probe/kubernetes/replication_controller.go @@ -8,6 +8,14 @@ import ( "k8s.io/kubernetes/pkg/labels" ) +// ReplicationController represents a Kubernetes replication controller +type ReplicationController interface { + Meta + Selector() labels.Selector + AddParent(topology, id string) + GetNode(probeID string) report.Node +} + type replicationController struct { *api.ReplicationController Meta @@ -16,7 +24,7 @@ type replicationController struct { } // NewReplicationController creates a new ReplicationController -func NewReplicationController(r *api.ReplicationController) ReplicaSet { +func NewReplicationController(r *api.ReplicationController) ReplicationController { return &replicationController{ ReplicationController: r, Meta: meta{r.ObjectMeta}, @@ -42,5 +50,5 @@ func (r *replicationController) GetNode(probeID string) report.Node { DesiredReplicas: fmt.Sprint(r.Spec.Replicas), FullyLabeledReplicas: fmt.Sprint(r.Status.FullyLabeledReplicas), report.ControlProbeID: probeID, - }).WithParents(r.parents) + }).WithParents(r.parents).WithControls(ScaleUp, ScaleDown) } diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index ae0b5b6aca..10de1ce03e 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -65,6 +65,21 @@ var ( TableTemplates = report.TableTemplates{ LabelPrefix: {ID: LabelPrefix, Label: "Kubernetes Labels", Prefix: LabelPrefix}, } + + ScalingControls = []report.Control{ + { + ID: ScaleDown, + Human: "Scale Down", + Icon: "fa-minus", + Rank: 0, + }, + { + ID: ScaleUp, + Human: "Scale Up", + Icon: "fa-plus", + Rank: 1, + }, + } ) // Reporter generate Reports containing Container and ContainerImage topologies @@ -206,6 +221,8 @@ func (r *Reporter) deploymentTopology(probeID string) (report.Topology, []Deploy WithTableTemplates(TableTemplates) deployments = []Deployment{} ) + result.Controls.AddControls(ScalingControls) + err := r.client.WalkDeployments(func(d Deployment) error { result = result.AddNode(d.GetNode(probeID)) deployments = append(deployments, d) @@ -222,6 +239,8 @@ func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment) replicaSets = []ReplicaSet{} selectors = []func(labelledChild){} ) + result.Controls.AddControls(ScalingControls) + for _, deployment := range deployments { selectors = append(selectors, match( deployment.Selector(), @@ -238,6 +257,18 @@ func (r *Reporter) replicaSetTopology(probeID string, deployments []Deployment) replicaSets = append(replicaSets, r) return nil }) + if err != nil { + return result, replicaSets, err + } + + err = r.client.WalkReplicationControllers(func(r ReplicationController) error { + for _, selector := range selectors { + selector(r) + } + result = result.AddNode(r.GetNode(probeID)) + replicaSets = append(replicaSets, ReplicaSet(r)) + return nil + }) return result, replicaSets, err } diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index 1a96e999d5..d9e10f8ee8 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -137,6 +137,9 @@ func (c *mockClient) WalkDeployments(f func(kubernetes.Deployment) error) error func (c *mockClient) WalkReplicaSets(f func(kubernetes.ReplicaSet) error) error { return nil } +func (c *mockClient) WalkReplicationControllers(f func(kubernetes.ReplicationController) error) error { + return nil +} func (*mockClient) WalkNodes(f func(*api.Node) error) error { return nil } @@ -151,6 +154,12 @@ func (c *mockClient) GetLogs(namespaceID, podName string) (io.ReadCloser, error) func (c *mockClient) DeletePod(namespaceID, podID string) error { return nil } +func (c *mockClient) ScaleUp(resource, namespaceID, id string) error { + return nil +} +func (c *mockClient) ScaleDown(resource, namespaceID, id string) error { + return nil +} type mockPipeClient map[string]xfer.Pipe diff --git a/report/id.go b/report/id.go index 8a364e6838..7fb94a0b78 100644 --- a/report/id.go +++ b/report/id.go @@ -98,34 +98,66 @@ func MakeProcessNodeID(hostID, pid string) string { var ( // MakeHostNodeID produces a host node ID from its composite parts. - MakeHostNodeID = singleComponentID("host") + MakeHostNodeID = makeSingleComponentID("host") + + // ParseHostNodeID parses a host node ID + ParseHostNodeID = parseSingleComponentID("host") // MakeContainerNodeID produces a container node ID from its composite parts. - MakeContainerNodeID = singleComponentID("container") + MakeContainerNodeID = makeSingleComponentID("container") + + // ParseContainerNodeID parses a container node ID + ParseContainerNodeID = parseSingleComponentID("container") // MakeContainerImageNodeID produces a container image node ID from its composite parts. - MakeContainerImageNodeID = singleComponentID("container_image") + MakeContainerImageNodeID = makeSingleComponentID("container_image") + + // ParseContainerImageNodeID parses a container image node ID + ParseContainerImageNodeID = parseSingleComponentID("container_image") // MakePodNodeID produces a pod node ID from its composite parts. - MakePodNodeID = singleComponentID("pod") + MakePodNodeID = makeSingleComponentID("pod") + + // ParsePodNodeID parses a pod node ID + ParsePodNodeID = parseSingleComponentID("pod") // MakeServiceNodeID produces a service node ID from its composite parts. - MakeServiceNodeID = singleComponentID("service") + MakeServiceNodeID = makeSingleComponentID("service") + + // ParseServiceNodeID parses a service node ID + ParseServiceNodeID = parseSingleComponentID("service") // MakeDeploymentNodeID produces a deployment node ID from its composite parts. - MakeDeploymentNodeID = singleComponentID("deployment") + MakeDeploymentNodeID = makeSingleComponentID("deployment") + + // ParseDeploymentNodeID parses a deployment node ID + ParseDeploymentNodeID = parseSingleComponentID("deployment") // MakeReplicaSetNodeID produces a replica set node ID from its composite parts. - MakeReplicaSetNodeID = singleComponentID("replica_set") + MakeReplicaSetNodeID = makeSingleComponentID("replica_set") + + // ParseReplicaSetNodeID parses a replica set node ID + ParseReplicaSetNodeID = parseSingleComponentID("replica_set") ) -// singleComponentID makes a -func singleComponentID(tag string) func(string) string { +// makeSingleComponentID makes a single-component node id encoder +func makeSingleComponentID(tag string) func(string) string { return func(id string) string { return id + ScopeDelim + "<" + tag + ">" } } +// parseSingleComponentID makes a single-component node id decoder +func parseSingleComponentID(tag string) func(string) (string, bool) { + return func(id string) (string, bool) { + fields := strings.SplitN(id, ScopeDelim, 2) + if len(fields) != 2 || fields[1] != "<"+tag+">" { + return "", false + } + return fields[0], true + } +} + // MakeOverlayNodeID produces an overlay topology node ID from a router peer's // name, which is assumed to be globally unique. func MakeOverlayNodeID(peerName string) string { @@ -153,15 +185,6 @@ func ParseEndpointNodeID(endpointNodeID string) (hostID, address, port string, o return fields[0], fields[1], fields[2], true } -// ParseContainerNodeID produces the container id from an container node ID. -func ParseContainerNodeID(containerNodeID string) (containerID string, ok bool) { - fields := strings.SplitN(containerNodeID, ScopeDelim, 2) - if len(fields) != 2 || fields[1] != "" { - return "", false - } - return fields[0], true -} - // ParseAddressNodeID produces the host ID, address from an address node ID. func ParseAddressNodeID(addressNodeID string) (hostID, address string, ok bool) { fields := strings.SplitN(addressNodeID, ScopeDelim, 2) @@ -171,15 +194,6 @@ func ParseAddressNodeID(addressNodeID string) (hostID, address string, ok bool) return fields[0], fields[1], true } -// ParsePodNodeID produces the namespace ID and pod ID from an pod node ID. -func ParsePodNodeID(podNodeID string) (uid string, ok bool) { - fields := strings.SplitN(podNodeID, ScopeDelim, 2) - if len(fields) != 2 || fields[1] != "" { - return "", false - } - return fields[0], true -} - // ExtractHostID extracts the host id from Node func ExtractHostID(m Node) string { hostNodeID, _ := m.Latest.Lookup(HostNodeID)