From 9962d4ef9ffc31b7cb0b0a24ab9758273f5493e6 Mon Sep 17 00:00:00 2001 From: Pablo Carranza Date: Sun, 13 May 2018 01:02:05 +0200 Subject: [PATCH] Fix concurrency problems, for real this time --- nomad-exporter.go | 71 ++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/nomad-exporter.go b/nomad-exporter.go index a7cb38e..131401b 100644 --- a/nomad-exporter.go +++ b/nomad-exporter.go @@ -296,6 +296,7 @@ func main() { noEvalMetricsEnabled = flag.Bool("no-eval-metrics", false, "disable eval metrics collection") noDeploymentMetricsEnabled = flag.Bool("no-deployment-metrics", false, "disable deployment metrics collection") noAllocationStatsMetricsEnabled = flag.Bool("no-allocation-stats-metrics", false, "disable stats metrics collection") + concurrency = flag.Int("concurrency", 20, "max number of goroutines to launch concurrently when poking the API") ) flag.Parse() @@ -342,6 +343,7 @@ func main() { EvalMetricsEnabled: !*noEvalMetricsEnabled, DeploymentMetricsEnabled: !*noDeploymentMetricsEnabled, AllocationStatsMetricsEnabled: !*noAllocationStatsMetricsEnabled, + Concurrency: *concurrency, } logrus.Debugf("Created exporter %#v", *exporter) @@ -385,6 +387,7 @@ type Exporter struct { EvalMetricsEnabled bool DeploymentMetricsEnabled bool AllocationStatsMetricsEnabled bool + Concurrency int } func (e *Exporter) shouldReadMetrics() bool { @@ -560,47 +563,47 @@ func (e *Exporter) collectNodes(ch chan<- prometheus.Metric) error { logrus.Debugf("I've the nodes list with %d nodes", len(nodes)) var w sync.WaitGroup - pool := make(chan func(), 10) // Only run 10 at a time + pool := make(chan func(), e.Concurrency) go func() { - f := <-pool - f() + for f := range pool { + go f() + } }() for _, node := range nodes { - state := 1 - drain := strconv.FormatBool(node.Drain) - - ch <- prometheus.MustNewConstMetric( - nodeInfo, prometheus.GaugeValue, 1, - node.Name, node.Version, node.NodeClass, node.Status, - drain, node.Datacenter, node.SchedulingEligibility, - ) + w.Add(1) + pool <- func(node api.NodeListStub) func() { + return func() { + defer w.Done() + state := 1 + drain := strconv.FormatBool(node.Drain) - if node.Status == "down" { - state = 0 - } - ch <- prometheus.MustNewConstMetric( - serfLanMembersStatus, prometheus.GaugeValue, float64(state), - node.Datacenter, node.NodeClass, node.Name, drain, - ) + ch <- prometheus.MustNewConstMetric( + nodeInfo, prometheus.GaugeValue, 1, + node.Name, node.Version, node.NodeClass, node.Status, + drain, node.Datacenter, node.SchedulingEligibility, + ) - if !validVersion(node.Name, node.Version) { - continue - } + if node.Status == "down" { + state = 0 + } + ch <- prometheus.MustNewConstMetric( + serfLanMembersStatus, prometheus.GaugeValue, float64(state), + node.Datacenter, node.NodeClass, node.Name, drain, + ) - if !e.AllocationStatsMetricsEnabled { - continue - } + if !validVersion(node.Name, node.Version) { + return + } - w.Add(1) - pool <- func(a api.NodeListStub) func() { - return func() { - defer w.Done() + if !e.AllocationStatsMetricsEnabled { + return + } - logrus.Debugf("Fetching node %#v", a) - node, _, err := e.client.Nodes().Info(a.ID, opts) + logrus.Debugf("Fetching node %#v", node) + node, _, err := e.client.Nodes().Info(node.ID, opts) if err != nil { - logError(fmt.Errorf("failed to get node %s info: %s", a.Name, err)) + logError(fmt.Errorf("failed to get node %s info: %s", node.Name, err)) return } @@ -648,7 +651,7 @@ func (e *Exporter) collectNodes(ch chan<- prometheus.Metric) error { nodeLabels..., ) - nodeStats, err := e.client.Nodes().Stats(a.ID, opts) + nodeStats, err := e.client.Nodes().Stats(node.ID, opts) if err != nil { logError(fmt.Errorf("failed to get node %s stats: %s", node.Name, err)) return @@ -723,7 +726,7 @@ func (e *Exporter) collectAllocations(ch chan<- prometheus.Metric) error { for _, allocStub := range allocStubs { w.Add(1) - go func(allocStub *api.AllocationListStub) { + go func(allocStub api.AllocationListStub) { defer w.Done() alloc, _, err := e.client.Allocations().Info(allocStub.ID, &api.QueryOptions{ @@ -814,7 +817,7 @@ func (e *Exporter) collectAllocations(ch chan<- prometheus.Metric) error { ) } - }(allocStub) + }(*allocStub) } w.Wait()