Skip to content

Commit

Permalink
Merge branch 'lock' into concurrent_collect_bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
hexdigest committed Mar 18, 2023
2 parents 24e2472 + 4e06c3e commit e382059
Showing 1 changed file with 70 additions and 54 deletions.
124 changes: 70 additions & 54 deletions exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ var _ metric.Reader = &Exporter{}
type collector struct {
reader metric.Reader

disableTargetInfo bool
withoutUnits bool
targetInfo prometheus.Metric
disableScopeInfo bool
createTargetInfoOnce sync.Once
scopeInfos map[instrumentation.Scope]prometheus.Metric
metricFamilies map[string]*dto.MetricFamily
disableTargetInfo bool
withoutUnits bool
targetInfo prometheus.Metric
disableScopeInfo bool
scopeInfos map[instrumentation.Scope]prometheus.Metric
metricFamilies map[string]*dto.MetricFamily
mu sync.RWMutex
}

// prometheus counters MUST have a _total suffix:
Expand Down Expand Up @@ -122,16 +122,18 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
}

c.createTargetInfoOnce.Do(func() {
// Resource should be immutable, we don't need to compute again
targetInfo, err := c.createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource)
c.mu.Lock()
if c.targetInfo == nil && !c.disableTargetInfo {
targetInfo, err := createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource)
if err != nil {
// If the target info metric is invalid, disable sending it.
otel.Handle(err)
c.disableTargetInfo = true
}
c.targetInfo = targetInfo
})
}
c.mu.Unlock()

if !c.disableTargetInfo {
ch <- c.targetInfo
}
Expand All @@ -140,6 +142,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
var keys, values [2]string

if !c.disableScopeInfo {
c.mu.Lock()
scopeInfo, ok := c.scopeInfos[scopeMetrics.Scope]
if !ok {
scopeInfo, err = createScopeInfoMetric(scopeMetrics.Scope)
Expand All @@ -148,40 +151,48 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
c.scopeInfos[scopeMetrics.Scope] = scopeInfo
}
c.mu.Unlock()

ch <- scopeInfo
keys = scopeInfoKeys
values = [2]string{scopeMetrics.Scope.Name, scopeMetrics.Scope.Version}
}

for _, m := range scopeMetrics.Metrics {
typ, name := c.metricTypeAndName(m)
if typ == nil {
continue
}

drop, help := c.validateMetrics(name, m.Description, typ)
if drop {
continue
}

if help != "" {
m.Description = help
}

switch v := m.Data.(type) {
case metricdata.Histogram[int64]:
addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addHistogramMetric(ch, v, m, keys, values, name)
case metricdata.Histogram[float64]:
addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addHistogramMetric(ch, v, m, keys, values, name)
case metricdata.Sum[int64]:
addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addSumMetric(ch, v, m, keys, values, name)
case metricdata.Sum[float64]:
addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addSumMetric(ch, v, m, keys, values, name)
case metricdata.Gauge[int64]:
addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addGaugeMetric(ch, v, m, keys, values, name)
case metricdata.Gauge[float64]:
addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addGaugeMetric(ch, v, m, keys, values, name)
}
}
}
}

func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string) {
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/3163): support exemplars
drop, help := validateMetrics(name, m.Description, dto.MetricType_HISTOGRAM.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}

for _, dp := range histogram.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)

Expand All @@ -202,24 +213,10 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra
}
}

func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string) {
valueType := prometheus.CounterValue
metricType := dto.MetricType_COUNTER
if !sum.IsMonotonic {
valueType = prometheus.GaugeValue
metricType = dto.MetricType_GAUGE
}
if sum.IsMonotonic {
// Add _total suffix for counters
name += counterSuffix
}

drop, help := validateMetrics(name, m.Description, metricType.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}

for _, dp := range sum.DataPoints {
Expand All @@ -235,15 +232,7 @@ func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata
}
}

func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
drop, help := validateMetrics(name, m.Description, dto.MetricType_GAUGE.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}

func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string) {
for _, dp := range gauge.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)

Expand Down Expand Up @@ -291,7 +280,7 @@ func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) {
return keys, values
}

func (c *collector) createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
func createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
keys, values := getAttrs(*res.Set(), [2]string{}, [2]string{})
desc := prometheus.NewDesc(name, description, keys, nil)
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(1), values...)
Expand Down Expand Up @@ -381,16 +370,43 @@ func sanitizeName(n string) string {
return b.String()
}

func validateMetrics(name, description string, metricType *dto.MetricType, mfs map[string]*dto.MetricFamily) (drop bool, help string) {
emf, exist := mfs[name]
func (c *collector) metricTypeAndName(m metricdata.Metrics) (*dto.MetricType, string) {
name := c.getName(m)

switch v := m.Data.(type) {
case metricdata.Histogram[int64], metricdata.Histogram[float64]:
return dto.MetricType_HISTOGRAM.Enum(), name
case metricdata.Sum[float64]:
if v.IsMonotonic {
return dto.MetricType_COUNTER.Enum(), name + counterSuffix
}
return dto.MetricType_GAUGE.Enum(), name
case metricdata.Sum[int64]:
if v.IsMonotonic {
return dto.MetricType_COUNTER.Enum(), name + counterSuffix
}
return dto.MetricType_GAUGE.Enum(), name
case metricdata.Gauge[int64], metricdata.Gauge[float64]:
return dto.MetricType_GAUGE.Enum(), name
}

return nil, ""
}

func (c *collector) validateMetrics(name, description string, metricType *dto.MetricType) (drop bool, help string) {
c.mu.Lock()
emf, exist := c.metricFamilies[name]
if !exist {
mfs[name] = &dto.MetricFamily{
c.metricFamilies[name] = &dto.MetricFamily{
Name: proto.String(name),
Help: proto.String(description),
Type: metricType,
}
c.mu.Unlock()
return false, ""
}
c.mu.Unlock()

if emf.GetType() != *metricType {
global.Error(
errors.New("instrument type conflict"),
Expand Down

0 comments on commit e382059

Please sign in to comment.