Skip to content

Commit

Permalink
Fix concurrency problems, for real this time
Browse files Browse the repository at this point in the history
  • Loading branch information
pcarranza committed May 12, 2018
1 parent c8d8acd commit 9962d4e
Showing 1 changed file with 37 additions and 34 deletions.
71 changes: 37 additions & 34 deletions nomad-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func main() {
noEvalMetricsEnabled = flag.Bool("no-eval-metrics", false, "disable eval metrics collection")
noDeploymentMetricsEnabled = flag.Bool("no-deployment-metrics", false, "disable deployment metrics collection")
noAllocationStatsMetricsEnabled = flag.Bool("no-allocation-stats-metrics", false, "disable stats metrics collection")
concurrency = flag.Int("concurrency", 20, "max number of goroutines to launch concurrently when poking the API")
)
flag.Parse()

Expand Down Expand Up @@ -342,6 +343,7 @@ func main() {
EvalMetricsEnabled: !*noEvalMetricsEnabled,
DeploymentMetricsEnabled: !*noDeploymentMetricsEnabled,
AllocationStatsMetricsEnabled: !*noAllocationStatsMetricsEnabled,
Concurrency: *concurrency,
}

logrus.Debugf("Created exporter %#v", *exporter)
Expand Down Expand Up @@ -385,6 +387,7 @@ type Exporter struct {
EvalMetricsEnabled bool
DeploymentMetricsEnabled bool
AllocationStatsMetricsEnabled bool
Concurrency int
}

func (e *Exporter) shouldReadMetrics() bool {
Expand Down Expand Up @@ -560,47 +563,47 @@ func (e *Exporter) collectNodes(ch chan<- prometheus.Metric) error {
logrus.Debugf("I've the nodes list with %d nodes", len(nodes))

var w sync.WaitGroup
pool := make(chan func(), 10) // Only run 10 at a time
pool := make(chan func(), e.Concurrency)
go func() {
f := <-pool
f()
for f := range pool {
go f()
}
}()

for _, node := range nodes {
state := 1
drain := strconv.FormatBool(node.Drain)

ch <- prometheus.MustNewConstMetric(
nodeInfo, prometheus.GaugeValue, 1,
node.Name, node.Version, node.NodeClass, node.Status,
drain, node.Datacenter, node.SchedulingEligibility,
)
w.Add(1)
pool <- func(node api.NodeListStub) func() {
return func() {
defer w.Done()
state := 1
drain := strconv.FormatBool(node.Drain)

if node.Status == "down" {
state = 0
}
ch <- prometheus.MustNewConstMetric(
serfLanMembersStatus, prometheus.GaugeValue, float64(state),
node.Datacenter, node.NodeClass, node.Name, drain,
)
ch <- prometheus.MustNewConstMetric(
nodeInfo, prometheus.GaugeValue, 1,
node.Name, node.Version, node.NodeClass, node.Status,
drain, node.Datacenter, node.SchedulingEligibility,
)

if !validVersion(node.Name, node.Version) {
continue
}
if node.Status == "down" {
state = 0
}
ch <- prometheus.MustNewConstMetric(
serfLanMembersStatus, prometheus.GaugeValue, float64(state),
node.Datacenter, node.NodeClass, node.Name, drain,
)

if !e.AllocationStatsMetricsEnabled {
continue
}
if !validVersion(node.Name, node.Version) {
return
}

w.Add(1)
pool <- func(a api.NodeListStub) func() {
return func() {
defer w.Done()
if !e.AllocationStatsMetricsEnabled {
return
}

logrus.Debugf("Fetching node %#v", a)
node, _, err := e.client.Nodes().Info(a.ID, opts)
logrus.Debugf("Fetching node %#v", node)
node, _, err := e.client.Nodes().Info(node.ID, opts)
if err != nil {
logError(fmt.Errorf("failed to get node %s info: %s", a.Name, err))
logError(fmt.Errorf("failed to get node %s info: %s", node.Name, err))
return
}

Expand Down Expand Up @@ -648,7 +651,7 @@ func (e *Exporter) collectNodes(ch chan<- prometheus.Metric) error {
nodeLabels...,
)

nodeStats, err := e.client.Nodes().Stats(a.ID, opts)
nodeStats, err := e.client.Nodes().Stats(node.ID, opts)
if err != nil {
logError(fmt.Errorf("failed to get node %s stats: %s", node.Name, err))
return
Expand Down Expand Up @@ -723,7 +726,7 @@ func (e *Exporter) collectAllocations(ch chan<- prometheus.Metric) error {
for _, allocStub := range allocStubs {
w.Add(1)

go func(allocStub *api.AllocationListStub) {
go func(allocStub api.AllocationListStub) {
defer w.Done()

alloc, _, err := e.client.Allocations().Info(allocStub.ID, &api.QueryOptions{
Expand Down Expand Up @@ -814,7 +817,7 @@ func (e *Exporter) collectAllocations(ch chan<- prometheus.Metric) error {
)
}

}(allocStub)
}(*allocStub)
}

w.Wait()
Expand Down

0 comments on commit 9962d4e

Please sign in to comment.