Skip to content

Commit

Permalink
Make the Go 1.17 collector thread-safe (#969)
Browse files Browse the repository at this point in the history
  • Loading branch information
mknyszek authored and kakkoyun committed May 13, 2022
1 parent 08a53e5 commit 772b893
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
8 changes: 8 additions & 0 deletions prometheus/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) {
func (c *selfCollector) Collect(ch chan<- Metric) {
ch <- c.self
}

// collectorMetric is a metric that is also a collector.
// Because of selfCollector, most (if not all) Metrics in
// this package are also collectors.
type collectorMetric interface {
Metric
Collector
}
32 changes: 25 additions & 7 deletions prometheus/go_collector_go117.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type goCollector struct {
base baseGoCollector

// rm... fields all pertain to the runtime/metrics package.
rmSampleMu sync.Mutex
rmSampleBuf []metrics.Sample
rmSampleMap map[string]*metrics.Sample
rmMetrics []Metric
rmMetrics []collectorMetric

// With Go 1.17, the runtime/metrics package was introduced.
// From that point on, metric names produced by the runtime/metrics
Expand All @@ -58,7 +59,7 @@ func NewGoCollector() Collector {
}

// Generate a Desc and ValueType for each runtime/metrics metric.
metricSet := make([]Metric, 0, len(descriptions))
metricSet := make([]collectorMetric, 0, len(descriptions))
sampleBuf := make([]metrics.Sample, 0, len(descriptions))
sampleMap := make(map[string]*metrics.Sample, len(descriptions))
for i := range descriptions {
Expand All @@ -76,7 +77,7 @@ func NewGoCollector() Collector {
sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name})
sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1]

var m Metric
var m collectorMetric
if d.Kind == metrics.KindFloat64Histogram {
_, hasSum := rmExactSumMap[d.Name]
m = newBatchHistogram(
Expand Down Expand Up @@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) {
// Collect base non-memory metrics.
c.base.Collect(ch)

// Collect must be thread-safe, so prevent concurrent use of
// rmSampleBuf. Just read into rmSampleBuf but write all the data
// we get into our Metrics or MemStats.
//
// Note that we cannot simply read and then clone rmSampleBuf
// because we'd need to perform a deep clone of it, which is likely
// not worth it.
c.rmSampleMu.Lock()

// Populate runtime/metrics sample buffer.
metrics.Read(c.rmSampleBuf)

// Update all our metrics from rmSampleBuf.
for i, sample := range c.rmSampleBuf {
// N.B. switch on concrete type because it's significantly more efficient
// than checking for the Counter and Gauge interface implementations. In
Expand All @@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) {
if v1 > v0 {
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
}
m.Collect(ch)
case *gauge:
m.Set(unwrapScalarRMValue(sample.Value))
m.Collect(ch)
case *batchHistogram:
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
m.Collect(ch)
default:
panic("unexpected metric type")
}
}

// ms is a dummy MemStats that we populate ourselves so that we can
// populate the old metrics from it.
var ms runtime.MemStats
memStatsFromRM(&ms, c.rmSampleMap)

c.rmSampleMu.Unlock()

// Export all the metrics to ch.
// At this point we must not access rmSampleBuf or rmSampleMap, because
// a concurrent caller could use it. It's safe to Collect all our Metrics,
// however, because they're updated in a thread-safe way while MemStats
// is local to this call of Collect.
for _, m := range c.rmMetrics {
m.Collect(ch)
}
for _, i := range c.msMetrics {
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
}
Expand Down
22 changes: 22 additions & 0 deletions prometheus/go_collector_go117_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) {
t.Log("where X is the Go version you are currently using")
}
}

func TestGoCollectorConcurrency(t *testing.T) {
c := NewGoCollector().(*goCollector)

// Set up multiple goroutines to Collect from the
// same GoCollector. In race mode with GOMAXPROCS > 1,
// this test should fail often if Collect is not
// concurrent-safe.
for i := 0; i < 4; i++ {
go func() {
ch := make(chan Metric)
go func() {
// Drain all metrics recieved until the
// channel is closed.
for range ch {
}
}()
c.Collect(ch)
close(ch)
}()
}
}

0 comments on commit 772b893

Please sign in to comment.