Skip to content

Commit

Permalink
remove global usage from aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
GustavoCaso committed Oct 18, 2024
1 parent 025f898 commit 1ec650d
Show file tree
Hide file tree
Showing 21 changed files with 124 additions and 65 deletions.
5 changes: 3 additions & 2 deletions cmd/serverless/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func runAgent(tagger tagger.Component) {
lambdaInitMetricChan := make(chan *serverlessLogs.LambdaInitMetric)
//nolint:revive // TODO(SERV) Fix revive linter
coldStartSpanId := random.Random.Uint64()
metricAgent := startMetricAgent(serverlessDaemon, logChannel, lambdaInitMetricChan)
metricAgent := startMetricAgent(serverlessDaemon, logChannel, lambdaInitMetricChan, tagger)

// Concurrently start heavyweight features
var wg sync.WaitGroup
Expand Down Expand Up @@ -205,9 +205,10 @@ func setupProxy(appsecProxyProcessor *httpsec.ProxyLifecycleProcessor, ta trace.
}
}

func startMetricAgent(serverlessDaemon *daemon.Daemon, logChannel chan *logConfig.ChannelMessage, lambdaInitMetricChan chan *serverlessLogs.LambdaInitMetric) *metrics.ServerlessMetricAgent {
func startMetricAgent(serverlessDaemon *daemon.Daemon, logChannel chan *logConfig.ChannelMessage, lambdaInitMetricChan chan *serverlessLogs.LambdaInitMetric, tagger tagger.Component) *metrics.ServerlessMetricAgent {
metricAgent := &metrics.ServerlessMetricAgent{
SketchesBucketOffset: time.Second * 10,
Tagger: tagger,
}
metricAgent.Start(daemon.FlushTimeout, &metrics.MetricConfig{}, &metrics.MetricDogStatsD{})
serverlessDaemon.SetStatsdServer(metricAgent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/status"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
Expand All @@ -42,6 +43,7 @@ type dependencies struct {
OrchestratorForwarder orchestratorforwarder.Component
EventPlatformForwarder eventplatform.Component
Compressor compression.Component
Tagger tagger.Component

Params Params
}
Expand Down Expand Up @@ -85,7 +87,9 @@ func newDemultiplexer(deps dependencies) (provides, error) {
options,
deps.EventPlatformForwarder,
deps.Compressor,
hostnameDetected)
deps.Tagger,
hostnameDetected,
)
demultiplexer := demultiplexer{
AgentDemultiplexer: agentDemultiplexer,
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/serializer/split"
"github.com/DataDog/datadog-agent/pkg/status/health"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util"
Expand Down Expand Up @@ -261,10 +262,12 @@ type BufferedAggregator struct {
health *health.Handle
agentName string // Name of the agent for telemetry metrics

tlmContainerTagsEnabled bool // Whether we should call the tagger to tag agent telemetry metrics
agentTags func(types.TagCardinality) ([]string, error) // This function gets the agent tags from the tagger (defined as a struct field to ease testing)
globalTags func(types.TagCardinality) ([]string, error) // This function gets global tags from the tagger when host tags are not available

tlmContainerTagsEnabled bool // Whether we should call the tagger to tag agent telemetry metrics
agentTags func(types.TagCardinality) ([]string, error) // This function gets the agent tags from the tagger (defined as a struct field to ease testing)
globalTags func(types.TagCardinality) ([]string, error) // This function gets global tags from the tagger when host tags are not available
enrichTags func(tagset.TagsAccumulator, taggertypes.OriginInfo)
checksCardinality func() types.TagCardinality
tagger tagger.Component
flushAndSerializeInParallel FlushAndSerializeInParallel
}

Expand All @@ -283,7 +286,7 @@ func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInPara
}

// NewBufferedAggregator instantiates a BufferedAggregator
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, hostname string, flushInterval time.Duration) *BufferedAggregator {
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator {
bufferSize := pkgconfigsetup.Datadog().GetInt("aggregator_buffer_size")

agentName := flavor.GetFlavor()
Expand Down Expand Up @@ -336,6 +339,9 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder
tlmContainerTagsEnabled: pkgconfigsetup.Datadog().GetBool("basic_telemetry_add_container_tags"),
agentTags: tagger.AgentTags,
globalTags: tagger.GlobalTags,
enrichTags: tagger.EnrichTags,
checksCardinality: tagger.ChecksCardinality,
tagger: tagger,
flushAndSerializeInParallel: NewFlushAndSerializeInParallel(pkgconfigsetup.Datadog()),
}

Expand Down Expand Up @@ -475,7 +481,7 @@ func (agg *BufferedAggregator) addServiceCheck(sc servicecheck.ServiceCheck) {
sc.Ts = time.Now().Unix()
}
tb := tagset.NewHashlessTagsAccumulatorFromSlice(sc.Tags)
tagger.EnrichTags(tb, sc.OriginInfo)
agg.enrichTags(tb, sc.OriginInfo)

tb.SortUniq()
sc.Tags = tb.Get()
Expand All @@ -489,7 +495,7 @@ func (agg *BufferedAggregator) addEvent(e event.Event) {
e.Ts = time.Now().Unix()
}
tb := tagset.NewHashlessTagsAccumulatorFromSlice(e.Tags)
tagger.EnrichTags(tb, e.OriginInfo)
agg.enrichTags(tb, e.OriginInfo)

tb.SortUniq()
e.Tags = tb.Get()
Expand Down Expand Up @@ -837,13 +843,13 @@ func (agg *BufferedAggregator) tags(withVersion bool) []string {
var tags []string

var err error
tags, err = agg.globalTags(tagger.ChecksCardinality())
tags, err = agg.globalTags(agg.checksCardinality())
if err != nil {
log.Debugf("Couldn't get Global tags: %v", err)
}

if agg.tlmContainerTagsEnabled {
agentTags, err := agg.agentTags(tagger.ChecksCardinality())
agentTags, err := agg.agentTags(agg.checksCardinality())
if err == nil {
if tags == nil {
tags = agentTags
Expand Down Expand Up @@ -955,5 +961,6 @@ func (agg *BufferedAggregator) handleRegisterSampler(id checkid.ID) {
pkgconfigsetup.Datadog().GetDuration("check_sampler_stateful_metric_expiration_time"),
agg.tagsStore,
id,
agg.tagger,
)
}
18 changes: 13 additions & 5 deletions pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
Expand Down Expand Up @@ -144,7 +146,8 @@ func TestAddServiceCheckDefaultValues(t *testing.T) {
// -

s := &MockSerializerIterableSerie{}
agg := NewBufferedAggregator(s, nil, "resolved-hostname", DefaultFlushInterval)
taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())
agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)

agg.addServiceCheck(servicecheck.ServiceCheck{
// leave Host and Ts fields blank
Expand Down Expand Up @@ -176,7 +179,8 @@ func TestAddEventDefaultValues(t *testing.T) {
// -

s := &MockSerializerIterableSerie{}
agg := NewBufferedAggregator(s, nil, "resolved-hostname", DefaultFlushInterval)
taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())
agg := NewBufferedAggregator(s, nil, taggerComponent, "resolved-hostname", DefaultFlushInterval)

agg.addEvent(event.Event{
// only populate required fields
Expand Down Expand Up @@ -225,7 +229,9 @@ func TestDefaultData(t *testing.T) {
// -

s := &MockSerializerIterableSerie{}
agg := NewBufferedAggregator(s, nil, "hostname", DefaultFlushInterval)
taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())
agg := NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval)

start := time.Now()

// Check only the name for `datadog.agent.up` as the timestamp may not be the same.
Expand Down Expand Up @@ -577,7 +583,8 @@ func TestTags(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
defer pkgconfigsetup.Datadog().SetWithoutSource("basic_telemetry_add_container_tags", nil)
pkgconfigsetup.Datadog().SetWithoutSource("basic_telemetry_add_container_tags", tt.tlmContainerTagsEnabled)
agg := NewBufferedAggregator(nil, nil, tt.hostname, time.Second)
taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())
agg := NewBufferedAggregator(nil, nil, taggerComponent, tt.hostname, time.Second)
agg.agentTags = tt.agentTags
agg.globalTags = tt.globalTags
assert.ElementsMatch(t, tt.want, agg.tags(tt.withVersion))
Expand Down Expand Up @@ -613,7 +620,8 @@ func TestAddDJMRecurrentSeries(t *testing.T) {

s := &MockSerializerIterableSerie{}
// NewBufferedAggregator with DJM enable will create a new recurrentSeries
NewBufferedAggregator(s, nil, "hostname", DefaultFlushInterval)
taggerComponent := fxutil.Test[tagger.Mock](t, taggerimpl.MockModule())
NewBufferedAggregator(s, nil, taggerComponent, "hostname", DefaultFlushInterval)

expectedRecurrentSeries := metrics.Series{&metrics.Serie{
Name: "datadog.djm.agent_host",
Expand Down
5 changes: 3 additions & 2 deletions pkg/aggregator/check_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math"
"time"

"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand All @@ -33,12 +34,12 @@ type CheckSampler struct {
}

// newCheckSampler returns a newly initialized CheckSampler
func newCheckSampler(expirationCount int, expireMetrics bool, contextResolverMetrics bool, statefulTimeout time.Duration, cache *tags.Store, id checkid.ID) *CheckSampler {
func newCheckSampler(expirationCount int, expireMetrics bool, contextResolverMetrics bool, statefulTimeout time.Duration, cache *tags.Store, id checkid.ID, tagger tagger.Component) *CheckSampler {
return &CheckSampler{
id: id,
series: make([]*metrics.Serie, 0),
sketches: make(metrics.SketchSeriesList, 0),
contextResolver: newCountBasedContextResolver(expirationCount, cache, string(id)),
contextResolver: newCountBasedContextResolver(expirationCount, cache, tagger, string(id)),
metrics: metrics.NewCheckMetrics(expireMetrics, statefulTimeout),
sketchMap: make(sketchMap),
lastBucketValue: make(map[ckey.ContextKey]int64),
Expand Down
10 changes: 7 additions & 3 deletions pkg/aggregator/check_sampler_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/hostname"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl"
Expand Down Expand Up @@ -43,16 +45,17 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) {
// For some reasons using InitAggregator[WithInterval] doesn't fix the problem,
// but this do.
deps := fxutil.Test[benchmarkDeps](b, core.MockBundle())
taggerComponent := fxutil.Test[tagger.Mock](b, taggerimpl.MockModule())
forwarderOpts := forwarder.NewOptionsWithResolvers(pkgconfigsetup.Datadog(), deps.Log, resolver.NewSingleDomainResolvers(map[string][]string{"hello": {"world"}}))
options := DefaultAgentDemultiplexerOptions()
options.DontStartForwarders = true
sharedForwarder := forwarder.NewDefaultForwarder(pkgconfigsetup.Datadog(), deps.Log, forwarderOpts)
orchestratorForwarder := optional.NewOption[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(deps.Hostname))
demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, "hostname")
demux := InitAndStartAgentDemultiplexer(deps.Log, sharedForwarder, &orchestratorForwarder, options, eventPlatformForwarder, deps.Compressor, taggerComponent, "hostname")
defer demux.Stop(true)

checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"))
checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent)

bucket := &metrics.HistogramBucket{
Name: "my.histogram",
Expand All @@ -71,7 +74,8 @@ func benchmarkAddBucket(bucketValue int64, b *testing.B) {
}

func benchmarkAddBucketWideBounds(bucketValue int64, b *testing.B) {
checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"))
taggerComponent := fxutil.Test[tagger.Mock](b, taggerimpl.MockModule())
checkSampler := newCheckSampler(1, true, true, 1000, tags.NewStore(true, "bench"), checkid.ID("hello:world:1234"), taggerComponent)

bounds := []float64{0, .0005, .001, .003, .005, .007, .01, .015, .02, .025, .03, .04, .05, .06, .07, .08, .09, .1, .5, 1, 5, 10}
bucket := &metrics.HistogramBucket{
Expand Down
26 changes: 17 additions & 9 deletions pkg/aggregator/check_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/core/tagger"
nooptagger "github.com/DataDog/datadog-agent/comp/core/tagger/noopimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator/ckey"
"github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand All @@ -27,12 +27,14 @@ import (
func generateContextKey(sample metrics.MetricSampleContext) ckey.ContextKey {
k := ckey.NewKeyGenerator()
tb := tagset.NewHashingTagsAccumulator()
sample.GetTags(tb, tb, tagger.EnrichTags)
taggerComponent := nooptagger.NewTaggerClient()
sample.GetTags(tb, tb, taggerComponent.EnrichTags)
return k.Generate(sample.GetName(), sample.GetHost(), tb)
}

func testCheckGaugeSampling(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down Expand Up @@ -95,7 +97,8 @@ func TestCheckGaugeSampling(t *testing.T) {
}

func testCheckRateSampling(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down Expand Up @@ -148,7 +151,8 @@ func TestCheckRateSampling(t *testing.T) {
}

func testHistogramCountSampling(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down Expand Up @@ -213,7 +217,8 @@ func TestHistogramCountSampling(t *testing.T) {
}

func testCheckHistogramBucketSampling(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

bucket1 := &metrics.HistogramBucket{
Name: "my.histogram",
Expand Down Expand Up @@ -290,7 +295,8 @@ func TestCheckHistogramBucketSampling(t *testing.T) {
}

func testCheckHistogramBucketDontFlushFirstValue(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

bucket1 := &metrics.HistogramBucket{
Name: "my.histogram",
Expand Down Expand Up @@ -345,7 +351,8 @@ func TestCheckHistogramBucketDontFlushFirstValue(t *testing.T) {
}

func testCheckHistogramBucketInfinityBucket(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

bucket1 := &metrics.HistogramBucket{
Name: "my.histogram",
Expand Down Expand Up @@ -380,7 +387,8 @@ func TestCheckHistogramBucketInfinityBucket(t *testing.T) {
}

func testCheckDistribution(t *testing.T, store *tags.Store) {
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"))
taggerComponent := nooptagger.NewTaggerClient()
checkSampler := newCheckSampler(1, true, true, 1*time.Second, store, checkid.ID("hello:world:1234"), taggerComponent)

mSample1 := metrics.MetricSample{
Name: "my.metric.name",
Expand Down
Loading

0 comments on commit 1ec650d

Please sign in to comment.