diff --git a/scrape/manager.go b/scrape/manager.go index 427b9f2be18..83f5bd76592 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -379,3 +379,14 @@ func (m *Manager) TargetsDroppedCounts() map[string]int { } return counts } + +// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given +// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness +// markers written for its series. +func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) { + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + if pool, ok := m.scrapePools[targetSet]; ok { + pool.disableEndOfRunStalenessMarkers(targets) + } +} diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 50f63201376..e70439d8192 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/config" @@ -703,3 +704,51 @@ scrape_configs: reload(scrapeManager, cfg2) require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools()) } + +func TestManagerDisableEndOfRunStalenessMarkers(t *testing.T) { + configForJob := func(jobName string) *config.ScrapeConfig { + return &config.ScrapeConfig{ + JobName: jobName, + ScrapeInterval: model.Duration(1 * time.Minute), + ScrapeTimeout: model.Duration(1 * time.Minute), + } + } + + cfg := &config.Config{ScrapeConfigs: []*config.ScrapeConfig{ + configForJob("one"), + configForJob("two"), + }} + + m := NewManager(&Options{}, nil, &nopAppendable{}) + defer m.Stop() + require.NoError(t, m.ApplyConfig(cfg)) + + // Pass targets to the manager. + tgs := map[string][]*targetgroup.Group{ + "one": {{Targets: []model.LabelSet{{"__address__": "h1"}, {"__address__": "h2"}, {"__address__": "h3"}}}}, + "two": {{Targets: []model.LabelSet{{"__address__": "h4"}}}}, + } + m.updateTsets(tgs) + m.reload() + + activeTargets := m.TargetsActive() + targetsToDisable := []*Target{ + activeTargets["one"][0], + activeTargets["one"][2], + NewTarget(labels.FromStrings("__address__", "h4"), labels.EmptyLabels(), nil), // non-existent target. + } + + // Disable end of run staleness markers for some targets. + m.DisableEndOfRunStalenessMarkers("one", targetsToDisable) + // This should be a no-op + m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable) + + // Check that the end of run staleness markers are disabled for the correct targets. + for _, group := range []string{"one", "two"} { + for _, tg := range activeTargets[group] { + loop := m.scrapePools[group].loops[tg.hash()].(*scrapeLoop) + expectedDisabled := slices.Contains(targetsToDisable, tg) + require.Equal(t, expectedDisabled, loop.disabledEndOfRunStalenessMarkers.Load()) + } + } +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 5db309008c7..d1780bbc9d7 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -34,6 +34,7 @@ import ( config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/version" + "go.uber.org/atomic" "golang.org/x/exp/slices" "github.com/prometheus/prometheus/config" @@ -668,6 +669,16 @@ func (sp *scrapePool) refreshTargetLimitErr() error { return nil } +func (sp *scrapePool) disableEndOfRunStalenessMarkers(targets []*Target) { + sp.mtx.Lock() + defer sp.mtx.Unlock() + for i := range targets { + if l, ok := sp.loops[targets[i].hash()]; ok { + l.disableEndOfRunStalenessMarkers() + } + } +} + func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error { if limits == nil { return nil @@ -933,7 +944,7 @@ type scrapeLoop struct { cancel func() stopped chan struct{} - disabledEndOfRunStalenessMarkers bool + disabledEndOfRunStalenessMarkers atomic.Bool reportExtraMetrics bool appendMetadataToWAL bool @@ -1317,7 +1328,7 @@ mainLoop: close(sl.stopped) - if !sl.disabledEndOfRunStalenessMarkers { + if !sl.disabledEndOfRunStalenessMarkers.Load() { sl.endOfRunStaleness(last, ticker, sl.interval) } } @@ -1478,6 +1489,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int case <-time.After(interval / 10): } + // Check if end-of-run staleness markers have been disabled while we were waiting. + if sl.disabledEndOfRunStalenessMarkers.Load() { + return + } + // Call sl.append again with an empty scrape to trigger stale markers. // If the target has since been recreated and scraped, the // stale markers will be out of order and ignored. @@ -1512,7 +1528,7 @@ func (sl *scrapeLoop) stop() { } func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() { - sl.disabledEndOfRunStalenessMarkers = true + sl.disabledEndOfRunStalenessMarkers.Store(true) } func (sl *scrapeLoop) getCache() *scrapeCache { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index d5216311b2e..27e62ef2d4b 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -3817,3 +3817,68 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t * require.True(t, value.IsStaleNaN(appender.resultFloats[6].f), "Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f)) } + +func TestScrapeLoopDisableStalenessMarkerInjection(t *testing.T) { + var ( + loopDone = make(chan struct{}, 1) + appender = &collectResultAppender{} + scraper = &testScraper{} + app = func(ctx context.Context) storage.Appender { return appender } + ) + + sl := newScrapeLoop(context.Background(), + scraper, + nil, nil, + nopMutator, + nopMutator, + app, + nil, + 0, + true, + false, + 0, 0, + nil, + 10*time.Millisecond, + time.Hour, + false, + false, + false, + nil, + false, + ) + + // Count scrapes and terminate loop after 2 scrapes. + numScrapes, maxScrapes := 0, 2 + scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { + numScrapes++ + if numScrapes >= maxScrapes { + go sl.stop() + <-sl.ctx.Done() + } + if _, err := w.Write([]byte("metric_a 42\n")); err != nil { + return err + } + return ctx.Err() + } + + go func() { + sl.run(nil) + loopDone <- struct{}{} + }() + + // Disable end of run staleness markers. + sl.disableEndOfRunStalenessMarkers() + + select { + case <-loopDone: + case <-time.After(5 * time.Second): + t.Fatalf("Scrape loop didn't stop.") + } + + // No stale markers should be appended, since they were disabled. + for _, s := range appender.resultFloats { + if value.IsStaleNaN(s.f) { + t.Fatalf("Got stale NaN samples while end of run staleness is disabled: %x", math.Float64bits(s.f)) + } + } +}