Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics-generator: add custom registry #1340

Merged
merged 15 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions integration/e2e/config-metrics-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ ingester:
replication_factor: 1

metrics_generator:
collection_interval: 1s
storage:
path: /var/tempo
remote_write:
- url: http://tempo_e2e-prometheus:9090/api/v1/write
send_exemplars: true
processor:
service_graphs:
histogram_buckets: [1, 2] # seconds
span_metrics:
histogram_buckets: [1, 2] # seconds
registry:
collection_interval: 1s
storage:
path: /var/tempo
remote_write:
- url: http://tempo_e2e-prometheus:9090/api/v1/write
send_exemplars: true

storage:
trace:
Expand Down
6 changes: 6 additions & 0 deletions integration/e2e/metrics_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func TestMetricsGenerator(t *testing.T) {
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_bucket", append(lbls, "le", "+Inf")))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_count", lbls))
assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_duration_seconds_sum", lbls))

// Verify metrics
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_received_total"))

assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(23), "tempo_metrics_generator_registry_active_series"))
assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(23), "tempo_metrics_generator_registry_series_added_total"))
}

func newPrometheus() *e2e.HTTPService {
Expand Down
25 changes: 5 additions & 20 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package generator

import (
"flag"
"time"

"github.com/grafana/tempo/modules/generator/processor/servicegraphs"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/generator/storage"
)

Expand All @@ -19,32 +19,17 @@ const (

// Config for a generator.
type Config struct {
Ring RingConfig `yaml:"ring"`

Ring RingConfig `yaml:"ring"`
Processor ProcessorConfig `yaml:"processor"`

// CollectionInterval controls how often to collect and remote write metrics.
// Defaults to 15s.
CollectionInterval time.Duration `yaml:"collection_interval"`

// ExternalLabels are added to any time-series exported by this instance.
ExternalLabels map[string]string `yaml:"external_labels,omitempty"`

// Add a label `tempo_instance_id` to every metric. This is necessary when running multiple
// instances of the metrics-generator as each instance will push the same time series.
AddInstanceIDLabel bool `yaml:"add_instance_id_label"`

Storage storage.Config `yaml:"storage"`
Registry registry.Config `yaml:"registry"`
Storage storage.Config `yaml:"storage"`
}

// RegisterFlagsAndApplyDefaults registers the flags.
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Processor.RegisterFlagsAndApplyDefaults(prefix, f)

cfg.CollectionInterval = 15 * time.Second
cfg.AddInstanceIDLabel = true

cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
}

Expand Down
31 changes: 1 addition & 30 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -62,13 +61,6 @@ func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Regist
return nil, errors.New("must configure metrics_generator.storage.path")
}

if cfg.AddInstanceIDLabel {
if cfg.ExternalLabels == nil {
cfg.ExternalLabels = make(map[string]string)
}
cfg.ExternalLabels["tempo_instance_id"] = cfg.Ring.InstanceID
}

g := &Generator{
cfg: cfg,
overrides: overrides,
Expand Down Expand Up @@ -146,14 +138,8 @@ func (g *Generator) starting(ctx context.Context) (err error) {
}

func (g *Generator) running(ctx context.Context) error {
collectMetricsTicker := time.NewTicker(g.cfg.CollectionInterval)
defer collectMetricsTicker.Stop()

for {
select {
case <-collectMetricsTicker.C:
g.collectMetrics()

case <-ctx.Done():
return nil

Expand All @@ -179,7 +165,7 @@ func (g *Generator) stopping(_ error) error {

for _, inst := range g.instances {
go func(inst *instance) {
inst.shutdown(context.Background())
inst.shutdown()
wg.Done()
}(inst)
}
Expand Down Expand Up @@ -257,21 +243,6 @@ func (g *Generator) createInstance(id string) (*instance, error) {
return newInstance(g.cfg, id, g.overrides, wal, g.reg, g.logger)
}

func (g *Generator) collectMetrics() {
ctx, cancel := context.WithTimeout(context.Background(), g.cfg.CollectionInterval)
defer cancel()

span, ctx := opentracing.StartSpanFromContext(ctx, "generator.collectMetrics")
defer span.Finish()

for _, instance := range g.instances {
err := instance.collectMetrics(ctx)
if err != nil {
level.Error(g.logger).Log("msg", "collecting metrics failed", "tenant", instance.instanceID, "err", err)
}
}
}

func (g *Generator) CheckReady(_ context.Context) error {
if !g.ringLifecycler.IsRegistered() {
return fmt.Errorf("metrics-generator check ready failed: not registered in the ring")
Expand Down
57 changes: 14 additions & 43 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/tracing"

"github.com/grafana/tempo/modules/generator/processor"
"github.com/grafana/tempo/modules/generator/processor/servicegraphs"
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/pkg/tempopb"
)
Expand Down Expand Up @@ -52,7 +51,7 @@ type instance struct {
instanceID string
overrides metricsGeneratorOverrides

registry processor.Registry
registry *registry.ManagedRegistry
wal storage.Storage

// processorsMtx protects the processors map, not the processors itself
Expand All @@ -73,7 +72,7 @@ func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverr
instanceID: instanceID,
overrides: overrides,

registry: processor.NewRegistry(cfg.ExternalLabels, instanceID),
registry: registry.New(&cfg.Registry, overrides, instanceID, wal, logger),
wal: wal,

processors: make(map[string]processor.Processor),
Expand Down Expand Up @@ -165,9 +164,9 @@ func (i *instance) addProcessor(processorName string) error {
var newProcessor processor.Processor
switch processorName {
case spanmetrics.Name:
newProcessor = spanmetrics.New(i.cfg.Processor.SpanMetrics, i.instanceID)
newProcessor = spanmetrics.New(i.cfg.Processor.SpanMetrics, i.registry)
case servicegraphs.Name:
newProcessor = servicegraphs.New(i.cfg.Processor.ServiceGraphs, i.instanceID, i.logger)
newProcessor = servicegraphs.New(i.cfg.Processor.ServiceGraphs, i.instanceID, i.registry, i.logger)
default:
level.Error(i.logger).Log(
"msg", fmt.Sprintf("processor does not exist, supported processors: [%s]", strings.Join(allSupportedProcessors, ", ")),
Expand All @@ -181,11 +180,6 @@ func (i *instance) addProcessor(processorName string) error {
return nil
}

err := newProcessor.RegisterMetrics(i.registry)
if err != nil {
return fmt.Errorf("error registering metrics for %s: %w", processorName, err)
}

i.processors[processorName] = newProcessor

return nil
Expand All @@ -203,10 +197,7 @@ func (i *instance) removeProcessor(processorName string) {

delete(i.processors, processorName)

err := deletedProcessor.Shutdown(context.Background(), i.registry)
if err != nil {
level.Error(i.logger).Log("msg", "processor did not shutdown cleanly", "name", deletedProcessor.Name(), "err", err)
}
deletedProcessor.Shutdown(context.Background())
}

// updateProcessorMetrics updates the active processor metrics. Must be called under a read lock.
Expand Down Expand Up @@ -244,42 +235,22 @@ func (i *instance) updatePushMetrics(req *tempopb.PushSpansRequest) {
metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount))
}

func (i *instance) collectMetrics(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.collectMetrics")
defer span.Finish()

traceID, _ := tracing.ExtractTraceID(ctx)
level.Info(i.logger).Log("msg", "collecting metrics", "traceID", traceID)

appender := i.wal.Appender(ctx)

err := i.registry.Gather(appender)
if err != nil {
return err
}

return appender.Commit()
}

// shutdown stops the instance and flushes any remaining data. After shutdown
// is called pushSpans should not be called anymore.
func (i *instance) shutdown(ctx context.Context) {
func (i *instance) shutdown() {
close(i.shutdownCh)

err := i.collectMetrics(ctx)
if err != nil {
level.Error(i.logger).Log("msg", "collecting metrics failed at shutdown", "err", err)
}

err = i.wal.Close()
if err != nil {
level.Error(i.logger).Log("msg", "closing wal failed", "tenant", i.instanceID, "err", err)
}

i.processorsMtx.Lock()
defer i.processorsMtx.Unlock()

for processorName := range i.processors {
i.removeProcessor(processorName)
}

i.registry.Close()

err := i.wal.Close()
if err != nil {
level.Error(i.logger).Log("msg", "closing wal failed", "tenant", i.instanceID, "err", err)
}
}
15 changes: 9 additions & 6 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func Test_instance_concurrency(t *testing.T) {
instance.pushSpans(context.Background(), &tempopb.PushSpansRequest{Batches: []*v1.ResourceSpans{req}})
})

go accessor(func() {
err := instance.collectMetrics(context.Background())
assert.NoError(t, err)
})

go accessor(func() {
processors := map[string]struct{}{
"span-metrics": {},
Expand All @@ -64,7 +59,7 @@ func Test_instance_concurrency(t *testing.T) {

time.Sleep(100 * time.Millisecond)

instance.shutdown(context.Background())
instance.shutdown()

time.Sleep(10 * time.Millisecond)
close(end)
Expand Down Expand Up @@ -118,6 +113,14 @@ type mockOverrides struct {

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)

func (m *mockOverrides) MetricsGeneratorMaxActiveSeries(userID string) uint32 {
return 0
}

func (m *mockOverrides) MetricsGeneratorCollectionInterval(userID string) time.Duration {
return 15 * time.Second
}

func (m *mockOverrides) MetricsGeneratorProcessors(userID string) map[string]struct{} {
return m.processors
}
Expand Down
3 changes: 3 additions & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package generator

import (
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/overrides"
)

type metricsGeneratorOverrides interface {
registry.Overrides

MetricsGeneratorProcessors(userID string) map[string]struct{}
}

Expand Down
11 changes: 3 additions & 8 deletions modules/generator/processor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,17 @@ package processor
import (
"context"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/tempo/pkg/tempopb"
)

type Processor interface {
// Name returns the name of the processor.
Name() string

// RegisterMetrics registers metrics that are emitted by this processor.
RegisterMetrics(reg prometheus.Registerer) error

// PushSpans processes a batch of spans and updates the metrics registered in RegisterMetrics.
PushSpans(ctx context.Context, req *tempopb.PushSpansRequest)

// Shutdown releases any resources allocated by the processor and unregisters metrics registered
// by RegisterMetrics. Once the processor is shut down, PushSpans should not be called anymore.
Shutdown(ctx context.Context, reg prometheus.Registerer) error
// Shutdown releases any resources allocated by the processor. Once the processor is shut down,
// PushSpans should not be called anymore.
Shutdown(ctx context.Context)
}
Loading