Skip to content

Commit

Permalink
Merge pull request #3274 from weaveworks/kubernetes-tagger
Browse files Browse the repository at this point in the history
Enable Kubernetes objects to be reported just once in a cluster
  • Loading branch information
bboreham authored Oct 19, 2018
2 parents 1b71a10 + 0c394e6 commit 8cccbb6
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 17 deletions.
3 changes: 2 additions & 1 deletion probe/host/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (t Tagger) Tag(r report.Report) (report.Report, error) {

// Explicitly don't tag Endpoints, Addresses and Overlay nodes - These topologies include pseudo nodes,
// and as such do their own host tagging.
for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host, r.Pod} {
// Don't tag Pods so they can be reported centrally.
for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host} {
for _, node := range topology.Nodes {
topology.ReplaceNode(node.WithLatests(metadata).WithParent(report.Host, t.hostNodeID))
}
Expand Down
11 changes: 9 additions & 2 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,15 @@ func isPauseContainer(n report.Node, rpt report.Report) bool {
return false
}

// Tagger adds pod parents to container nodes.
type Tagger struct {
}

// Name of this tagger, for metrics gathering
func (Tagger) Name() string { return "K8s" }

// Tag adds pod parents to container nodes.
func (r *Reporter) Tag(rpt report.Report) (report.Report, error) {
func (r *Tagger) Tag(rpt report.Report) (report.Report, error) {
for id, n := range rpt.Container.Nodes {
uid, ok := n.Latest.Lookup(docker.LabelPrefix + "io.kubernetes.pod.uid")
if !ok {
Expand Down Expand Up @@ -629,7 +636,7 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae
}

var localPodUIDs map[string]struct{}
if r.nodeName == "" {
if r.nodeName == "" && r.kubeletPort != 0 {
// We don't know the node name: fall back to obtaining the local pods from kubelet
var err error
localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort))
Expand Down
3 changes: 1 addition & 2 deletions probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,7 @@ func TestTagger(t *testing.T) {
docker.LabelPrefix + "io.kubernetes.pod.uid": "123456",
}))

hr := controls.NewDefaultHandlerRegistry()
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, "", 0).Tag(rpt)
rpt, err := (&kubernetes.Tagger{}).Tag(rpt)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type probeFlags struct {
criEndpoint string

kubernetesEnabled bool
kubernetesRole string
kubernetesNodeName string
kubernetesClientConfig kubernetes.ClientConfig
kubernetesKubeletPort uint
Expand Down Expand Up @@ -314,6 +315,7 @@ func setupFlags(flags *flags) {

// K8s
flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers")
flag.StringVar(&flags.probe.kubernetesRole, "probe.kubernetes.role", "", "host, cluster or blank for everything")
flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)")
flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority")
flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS")
Expand All @@ -328,7 +330,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet (zero to disable)")

// AWS ECS
flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node")
Expand Down
27 changes: 24 additions & 3 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import (
const (
versionCheckPeriod = 6 * time.Hour
defaultServiceHost = "https://cloud.weave.works.:443"

kubernetesRoleHost = "host"
kubernetesRoleCluster = "cluster"
)

var (
Expand Down Expand Up @@ -105,6 +108,21 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
logCensoredArgs()
defer log.Info("probe exiting")

switch flags.kubernetesRole {
case "": // nothing special
case kubernetesRoleHost:
flags.kubernetesEnabled = true
case kubernetesRoleCluster:
flags.kubernetesKubeletPort = 0
flags.kubernetesEnabled = true
flags.spyProcs = false
flags.procEnabled = false
flags.useConntrack = false
flags.useEbpfConn = false
default:
log.Warnf("unrecognized --probe.kubernetes.role: %s", flags.kubernetesRole)
}

if flags.spyProcs && os.Getegid() != 0 {
log.Warn("--probe.proc.spy=true, but that requires root to find everything")
}
Expand Down Expand Up @@ -233,7 +251,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
if flags.dockerEnabled {
// Don't add the bridge in Kubernetes since container IPs are global and
// shouldn't be scoped
if !flags.kubernetesEnabled {
if flags.dockerBridge != "" && !flags.kubernetesEnabled {
if err := report.AddLocalBridge(flags.dockerBridge); err != nil {
log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err)
}
Expand Down Expand Up @@ -267,19 +285,22 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}
}

if flags.kubernetesEnabled {
if flags.kubernetesEnabled && flags.kubernetesRole != kubernetesRoleHost {
if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
} else {
log.Errorf("Kubernetes: failed to start client: %v", err)
log.Errorf("Kubernetes: make sure to run Scope inside a POD with a service account or provide valid probe.kubernetes.* flags")
}
}

if flags.kubernetesEnabled {
p.AddTagger(&kubernetes.Tagger{})
}

if flags.ecsEnabled {
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, flags.ecsClusterRegion, handlerRegistry, probeID)
defer reporter.Stop()
Expand Down
40 changes: 32 additions & 8 deletions render/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,45 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies,
},
MakeReduce(
PropagateSingleMetrics(report.Container,
Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID,
chainRenderer: MakeFilter(
ComposeFilterFuncs(
IsRunning,
Complement(isPauseContainer),
),
ContainerWithImageNameRenderer,
)},
MakeMap(propagatePodHost,
Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID,
chainRenderer: MakeFilter(
ComposeFilterFuncs(
IsRunning,
Complement(isPauseContainer),
),
ContainerWithImageNameRenderer,
)},
),
),
ConnectionJoin(MapPod2IP, report.Pod),
KubernetesVolumesRenderer,
),
),
))

// Pods are not tagged with a Host parent, but their container children are.
// If n doesn't already have a host, copy it from one of the children
func propagatePodHost(n report.Node) report.Node {
if n.Topology != report.Pod {
return n
} else if _, found := n.Parents.Lookup(report.Host); found {
return n
}
done := false
n.Children.ForEach(func(child report.Node) {
if !done {
if hosts, found := child.Parents.Lookup(report.Host); found {
for _, h := range hosts {
n = n.WithParent(report.Host, h)
}
done = true
}
}
})
return n
}

// PodServiceRenderer is a Renderer which produces a renderable kubernetes services
// graph by merging the pods graph and the services topology.
//
Expand Down

0 comments on commit 8cccbb6

Please sign in to comment.