Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Kubernetes objects to be reported just once in a cluster #3274

Merged
merged 8 commits into from
Oct 19, 2018
3 changes: 2 additions & 1 deletion probe/host/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (t Tagger) Tag(r report.Report) (report.Report, error) {

// Explicitly don't tag Endpoints, Addresses and Overlay nodes - These topologies include pseudo nodes,
// and as such do their own host tagging.
for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host, r.Pod} {
// Don't tag Pods so they can be reported centrally.
for _, topology := range []report.Topology{r.Process, r.Container, r.ContainerImage, r.Host} {
for _, node := range topology.Nodes {
topology.ReplaceNode(node.WithLatests(metadata).WithParent(report.Host, t.hostNodeID))
}
Expand Down
11 changes: 9 additions & 2 deletions probe/kubernetes/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,15 @@ func isPauseContainer(n report.Node, rpt report.Report) bool {
return false
}

// Tagger adds pod parents to container nodes.
type Tagger struct {
}

// Name of this tagger, for metrics gathering
func (Tagger) Name() string { return "K8s" }

// Tag adds pod parents to container nodes.
func (r *Reporter) Tag(rpt report.Report) (report.Report, error) {
func (r *Tagger) Tag(rpt report.Report) (report.Report, error) {
for id, n := range rpt.Container.Nodes {
uid, ok := n.Latest.Lookup(docker.LabelPrefix + "io.kubernetes.pod.uid")
if !ok {
Expand Down Expand Up @@ -554,7 +561,7 @@ func (r *Reporter) podTopology(services []Service, deployments []Deployment, dae
}

var localPodUIDs map[string]struct{}
if r.nodeName == "" {
if r.nodeName == "" && r.kubeletPort != 0 {
// We don't know the node name: fall back to obtaining the local pods from kubelet
var err error
localPodUIDs, err = GetLocalPodUIDs(fmt.Sprintf("127.0.0.1:%d", r.kubeletPort))

This comment was marked as abuse.

Expand Down
3 changes: 1 addition & 2 deletions probe/kubernetes/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ func TestTagger(t *testing.T) {
docker.LabelPrefix + "io.kubernetes.pod.uid": "123456",
}))

hr := controls.NewDefaultHandlerRegistry()
rpt, err := kubernetes.NewReporter(newMockClient(), nil, "", "", nil, hr, "", 0).Tag(rpt)
rpt, err := (&kubernetes.Tagger{}).Tag(rpt)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type probeFlags struct {
criEndpoint string

kubernetesEnabled bool
kubernetesRole string
kubernetesNodeName string
kubernetesClientConfig kubernetes.ClientConfig
kubernetesKubeletPort uint
Expand Down Expand Up @@ -314,6 +315,7 @@ func setupFlags(flags *flags) {

// K8s
flag.BoolVar(&flags.probe.kubernetesEnabled, "probe.kubernetes", false, "collect kubernetes-related attributes for containers")
flag.StringVar(&flags.probe.kubernetesRole, "probe.kubernetes.role", "", "host, cluster or blank for everything")
flag.StringVar(&flags.probe.kubernetesClientConfig.Server, "probe.kubernetes.api", "", "The address and port of the Kubernetes API server (deprecated in favor of equivalent probe.kubernetes.server)")
flag.StringVar(&flags.probe.kubernetesClientConfig.CertificateAuthority, "probe.kubernetes.certificate-authority", "", "Path to a cert. file for the certificate authority")
flag.StringVar(&flags.probe.kubernetesClientConfig.ClientCertificate, "probe.kubernetes.client-certificate", "", "Path to a client certificate file for TLS")
Expand All @@ -328,7 +330,7 @@ func setupFlags(flags *flags) {
flag.StringVar(&flags.probe.kubernetesClientConfig.User, "probe.kubernetes.user", "", "The name of the kubeconfig user to use")
flag.StringVar(&flags.probe.kubernetesClientConfig.Username, "probe.kubernetes.username", "", "Username for basic authentication to the API server")
flag.StringVar(&flags.probe.kubernetesNodeName, "probe.kubernetes.node-name", "", "Name of this node, for filtering pods")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet")
flag.UintVar(&flags.probe.kubernetesKubeletPort, "probe.kubernetes.kubelet-port", 10255, "Node-local TCP port for contacting kubelet (zero to disable)")

This comment was marked as abuse.

This comment was marked as abuse.


// AWS ECS
flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node")
Expand Down
27 changes: 24 additions & 3 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import (
const (
versionCheckPeriod = 6 * time.Hour
defaultServiceHost = "https://cloud.weave.works.:443"

kubernetesRoleHost = "host"
kubernetesRoleCluster = "cluster"
)

var (
Expand Down Expand Up @@ -105,6 +108,21 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
logCensoredArgs()
defer log.Info("probe exiting")

switch flags.kubernetesRole {
case "": // nothing special
case kubernetesRoleHost:
flags.kubernetesEnabled = true
case kubernetesRoleCluster:
flags.kubernetesKubeletPort = 0
flags.kubernetesEnabled = true
flags.spyProcs = false
flags.procEnabled = false
flags.useConntrack = false
flags.useEbpfConn = false
default:
log.Warnf("unrecognized --probe.kubernetes.role: %s", flags.kubernetesRole)
}

if flags.spyProcs && os.Getegid() != 0 {
log.Warn("--probe.proc.spy=true, but that requires root to find everything")
}
Expand Down Expand Up @@ -233,7 +251,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
if flags.dockerEnabled {
// Don't add the bridge in Kubernetes since container IPs are global and
// shouldn't be scoped
if !flags.kubernetesEnabled {
if flags.dockerBridge != "" && !flags.kubernetesEnabled {
if err := report.AddLocalBridge(flags.dockerBridge); err != nil {
log.Errorf("Docker: problem with bridge %s: %v", flags.dockerBridge, err)
}
Expand Down Expand Up @@ -267,19 +285,22 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}
}

if flags.kubernetesEnabled {
if flags.kubernetesEnabled && flags.kubernetesRole != kubernetesRoleHost {
if client, err := kubernetes.NewClient(flags.kubernetesClientConfig); err == nil {
defer client.Stop()
reporter := kubernetes.NewReporter(client, clients, probeID, hostID, p, handlerRegistry, flags.kubernetesNodeName, flags.kubernetesKubeletPort)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
} else {
log.Errorf("Kubernetes: failed to start client: %v", err)
log.Errorf("Kubernetes: make sure to run Scope inside a POD with a service account or provide valid probe.kubernetes.* flags")
}
}

if flags.kubernetesEnabled {
p.AddTagger(&kubernetes.Tagger{})
}

if flags.ecsEnabled {
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, flags.ecsClusterRegion, handlerRegistry, probeID)
defer reporter.Stop()
Expand Down
40 changes: 32 additions & 8 deletions render/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,45 @@ var PodRenderer = Memoise(ConditionalRenderer(renderKubernetesTopologies,
},
MakeReduce(
PropagateSingleMetrics(report.Container,
Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID,
chainRenderer: MakeFilter(
ComposeFilterFuncs(
IsRunning,
Complement(isPauseContainer),
),
ContainerWithImageNameRenderer,
)},
MakeMap(propagatePodHost,
Map2Parent{topologies: []string{report.Pod}, noParentsPseudoID: UnmanagedID,
chainRenderer: MakeFilter(
ComposeFilterFuncs(
IsRunning,
Complement(isPauseContainer),
),
ContainerWithImageNameRenderer,
)},
),
),
ConnectionJoin(MapPod2IP, report.Pod),
KubernetesVolumesRenderer,
),
),
))

// Pods are not tagged with a Host parent, but their container children are.
// If n doesn't already have a host, copy it from one of the children
func propagatePodHost(n report.Node) report.Node {
if n.Topology != report.Pod {
return n
} else if _, found := n.Parents.Lookup(report.Host); found {
return n
}
done := false
n.Children.ForEach(func(child report.Node) {
if !done {
if hosts, found := child.Parents.Lookup(report.Host); found {
for _, h := range hosts {
n = n.WithParent(report.Host, h)
}
done = true
}
}
})
return n
}

// PodServiceRenderer is a Renderer which produces a renderable kubernetes services
// graph by merging the pods graph and the services topology.
//
Expand Down