diff --git a/probe/host/tagger.go b/probe/host/tagger.go index 29e5413243..f466c96e71 100644 --- a/probe/host/tagger.go +++ b/probe/host/tagger.go @@ -30,7 +30,8 @@ func (t Tagger) Tag(r report.Report) (report.Report, error) { // Explicitly don't tag Endpoints, Addresses and Overlay nodes - These topologies include pseudo nodes, // and as such do their own host tagging. - for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host, r.Pod} { + // Don't tag Pods so they can be reported centrally. + for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host} { for _, node := range topology.Nodes { topology.ReplaceNode(node.WithLatests(metadata).WithParent(report.Host, t.hostNodeID)) } diff --git a/probe/kubernetes/reporter.go b/probe/kubernetes/reporter.go index 29493c54ad..76dbdac00c 100644 --- a/probe/kubernetes/reporter.go +++ b/probe/kubernetes/reporter.go @@ -259,8 +259,15 @@ func isPauseContainer(n report.Node, rpt report.Report) bool { return false } +// Tagger adds pod parents to container nodes. +type Tagger struct { +} + +// Name of this tagger, for metrics gathering +func (Tagger) Name() string { return "K8s" } + // Tag adds pod parents to container nodes. -func (r *Reporter) Tag(rpt report.Report) (report.Report, error) { +func (r *Tagger) Tag(rpt report.Report) (report.Report, error) { for id, n := range rpt.Container.Nodes { uid, ok := n.Latest.Lookup(docker.LabelPrefix + "io.kubernetes.pod.uid") if !ok { @@ -629,7 +636,7 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae } var localPodUIDs map[string]struct{} - if r.nodeName == "" { + if r.nodeName == "" && r.kubeletPort != 0 { // We don't know the node name: fall back to obtaining the local pods from kubelet var err error localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort)) diff --git a/probe/kubernetes/reporter_test.go b/probe/kubernetes/reporter_test.go index bc11746cf9..2184d4825c 100644 --- a/probe/kubernetes/reporter_test.go +++ b/probe/kubernetes/reporter_test.go @@ -343,8 +343,7 @@ func TestTagger(t *testing.T) { docker.LabelPrefix + "io.kubernetes.pod.uid": "123456", })) - hr := controls.NewDefaultHandlerRegistry() - rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, "", 0).Tag(rpt) + rpt, err := (&kubernetes.Tagger{}).Tag(rpt) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/prog/main.go b/prog/main.go index 9630b816bf..13d1ae1a01 100644 --- a/prog/main.go +++ b/prog/main.go @@ -124,6 +124,7 @@ type probeFlags struct { criEndpoint string kubernetesEnabled bool + kubernetesRole string kubernetesNodeName string kubernetesClientConfig kubernetes.ClientConfig kubernetesKubeletPort uint @@ -314,6 +315,7 @@ func setupFlags(flags *flags) { // K8s flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers") + flag.StringVar(&flags.probe.kubernetesRole, "probe.kubernetes.role", "", "host, cluster or blank for everything") flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)") flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority") flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS") @@ -328,7 +330,7 @@ func setupFlags(flags *flags) { flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use") flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server") flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods") - flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet") + flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet (zero to disable)") // AWS ECS flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node") diff --git a/prog/probe.go b/prog/probe.go index 756a5d00bf..c06d6821dd 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -41,6 +41,9 @@ import ( const ( versionCheckPeriod = 6 * time.Hour defaultServiceHost = "https://cloud.weave.works.:443" + + kubernetesRoleHost = "host" + kubernetesRoleCluster = "cluster" ) var ( @@ -105,6 +108,21 @@ func probeMain(flags probeFlags, targets []appclient.Target) { logCensoredArgs() defer log.Info("probe exiting") + switch flags.kubernetesRole { + case "": // nothing special + case kubernetesRoleHost: + flags.kubernetesEnabled = true + case kubernetesRoleCluster: + flags.kubernetesKubeletPort = 0 + flags.kubernetesEnabled = true + flags.spyProcs = false + flags.procEnabled = false + flags.useConntrack = false + flags.useEbpfConn = false + default: + log.Warnf("unrecognized --probe.kubernetes.role: %s", flags.kubernetesRole) + } + if flags.spyProcs && os.Getegid() != 0 { log.Warn("--probe.proc.spy=true, but that requires root to find everything") } @@ -233,7 +251,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { if flags.dockerEnabled { // Don't add the bridge in Kubernetes since container IPs are global and // shouldn't be scoped - if !flags.kubernetesEnabled { + if flags.dockerBridge != "" && !flags.kubernetesEnabled { if err := report.AddLocalBridge(flags.dockerBridge); err != nil { log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err) } @@ -267,19 +285,22 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } } - if flags.kubernetesEnabled { + if flags.kubernetesEnabled && flags.kubernetesRole != kubernetesRoleHost { if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil { defer client.Stop() reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort) defer reporter.Stop() p.AddReporter(reporter) - p.AddTagger(reporter) } else { log.Errorf("Kubernetes: failed to start client: %v", err) log.Errorf("Kubernetes: make sure to run Scope inside a POD with a service account or provide valid probe.kubernetes.* flags") } } + if flags.kubernetesEnabled { + p.AddTagger(&kubernetes.Tagger{}) + } + if flags.ecsEnabled { reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, flags.ecsClusterRegion, handlerRegistry, probeID) defer reporter.Stop() diff --git a/render/pod.go b/render/pod.go index c08ab0e515..49b97be876 100644 --- a/render/pod.go +++ b/render/pod.go @@ -53,14 +53,16 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies, }, MakeReduce( PropagateSingleMetrics(report.Container, - Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID, - chainRenderer: MakeFilter( - ComposeFilterFuncs( - IsRunning, - Complement(isPauseContainer), - ), - ContainerWithImageNameRenderer, - )}, + MakeMap(propagatePodHost, + Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID, + chainRenderer: MakeFilter( + ComposeFilterFuncs( + IsRunning, + Complement(isPauseContainer), + ), + ContainerWithImageNameRenderer, + )}, + ), ), ConnectionJoin(MapPod2IP, report.Pod), KubernetesVolumesRenderer, @@ -68,6 +70,28 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies, ), )) +// Pods are not tagged with a Host parent, but their container children are. +// If n doesn't already have a host, copy it from one of the children +func propagatePodHost(n report.Node) report.Node { + if n.Topology != report.Pod { + return n + } else if _, found := n.Parents.Lookup(report.Host); found { + return n + } + done := false + n.Children.ForEach(func(child report.Node) { + if !done { + if hosts, found := child.Parents.Lookup(report.Host); found { + for _, h := range hosts { + n = n.WithParent(report.Host, h) + } + done = true + } + } + }) + return n +} + // PodServiceRenderer is a Renderer which produces a renderable kubernetes services // graph by merging the pods graph and the services topology. //