Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared interner: fix leak of interned strings from scrape cache #21

Merged
merged 2 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/intern/intern.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,18 @@ func New(r prometheus.Registerer) Interner {
}

type Metrics struct {
Strings prometheus.Gauge
NoReferenceReleases prometheus.Counter
}

func NewMetrics(r prometheus.Registerer) *Metrics {
var m Metrics
m.Strings = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "prometheus",
Subsystem: "interner",
Name: "num_strings",
Help: "The current number of interned strings",
})
m.NoReferenceReleases = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "interner",
Expand All @@ -65,6 +72,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
})

if r != nil {
r.MustRegister(m.Strings)
r.MustRegister(m.NoReferenceReleases)
}

Expand Down Expand Up @@ -109,6 +117,7 @@ func (p *pool) Intern(s string) string {
return interned.s
}

p.m.Strings.Inc()
p.pool[s] = newEntry(s)
p.pool[s].refs.Store(1)
return s
Expand All @@ -134,6 +143,7 @@ func (p *pool) Release(s string) {
if interned.refs.Load() != 0 {
return
}
p.m.Strings.Dec()
delete(p.pool, s)
}

Expand Down
62 changes: 36 additions & 26 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
// Update the targets retrieval function for metadata to a new scrape cache.
cache := opts.cache
if cache == nil {
cache = newScrapeCache()
cache = newScrapeCache(intern.Global)
}
opts.target.SetMetadataStore(cache)

Expand Down Expand Up @@ -396,7 +396,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
oldLoop.disableEndOfRunStalenessMarkers()
cache = oc
} else {
cache = newScrapeCache()
cache = newScrapeCache(intern.Global)
}
var (
t = sp.activeTargets[fp]
Expand Down Expand Up @@ -802,14 +802,16 @@ type scrapeCache struct {
// string in addDropped().
droppedSeries map[string]*uint64

// seriesCur and seriesPrev store the labels of series that were seen
// in the current and previous scrape.
// seriesCur and seriesPrev store the ref of series that were seen
// in the current and previous scrape based on the hash.
// We hold two maps and swap them out to save allocations.
seriesCur map[uint64]labels.Labels
seriesPrev map[uint64]labels.Labels
seriesCur map[uint64]*cacheEntry
seriesPrev map[uint64]*cacheEntry

metaMtx sync.Mutex
metadata map[string]*metaEntry

interner intern.Interner
}

// metaEntry holds meta information about a metric.
Expand All @@ -825,13 +827,17 @@ func (m *metaEntry) size() int {
return len(m.help) + len(m.unit) + len(m.typ)
}

func newScrapeCache() *scrapeCache {
func newScrapeCache(interner intern.Interner) *scrapeCache {
if interner == nil {
interner = intern.Global
}
return &scrapeCache{
series: map[string]*cacheEntry{},
droppedSeries: map[string]*uint64{},
seriesCur: map[uint64]labels.Labels{},
seriesPrev: map[uint64]labels.Labels{},
seriesCur: map[uint64]*cacheEntry{},
seriesPrev: map[uint64]*cacheEntry{},
metadata: map[string]*metaEntry{},
interner: interner,
}
}

Expand All @@ -858,7 +864,7 @@ func (c *scrapeCache) iterDone(flushCache bool) {
// that haven't appeared in the last scrape.
for s, e := range c.series {
if c.iter != e.lastIter {
intern.ReleaseLabels(intern.Global, e.lset)
intern.ReleaseLabels(c.interner, e.lset)
delete(c.series, s)
}
}
Expand Down Expand Up @@ -897,11 +903,14 @@ func (c *scrapeCache) get(met string) (*cacheEntry, bool) {
return e, true
}

func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {
if ref == 0 {
return
}
c.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
func (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) *cacheEntry {
// The cache entries are used for staleness tracking so even if ref is
// 0 we need to track it.
intern.InternLabels(c.interner, lset)

ce := &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
c.series[met] = ce
return ce
}

func (c *scrapeCache) addDropped(met string) {
Expand All @@ -917,14 +926,14 @@ func (c *scrapeCache) getDropped(met string) bool {
return ok
}

func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
c.seriesCur[hash] = lset
func (c *scrapeCache) trackStaleness(ce *cacheEntry) {
c.seriesCur[ce.hash] = ce
}

func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
for h, lset := range c.seriesPrev {
if _, ok := c.seriesCur[h]; !ok {
if !f(lset) {
for hash, ce := range c.seriesPrev {
if _, ok := c.seriesCur[hash]; !ok {
if !f(ce.lset) {
break
}
}
Expand Down Expand Up @@ -1048,7 +1057,7 @@ func newScrapeLoop(ctx context.Context,
buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
}
if cache == nil {
cache = newScrapeCache()
cache = newScrapeCache(intern.Global)
}
sl := &scrapeLoop{
scraper: sc,
Expand Down Expand Up @@ -1421,7 +1430,6 @@ loop:
}
}

intern.InternLabels(intern.Global, lset)
ref, err = app.Append(ref, lset, t, v)
sampleAdded, err = sl.checkAddError(ce, met, tp, err, &sampleLimitErr, &appErrs)
if err != nil {
Expand All @@ -1432,14 +1440,17 @@ loop:
}

if !ok {
ce := sl.cache.addRef(mets, ref, lset, hash)
if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
sl.cache.trackStaleness(hash, lset)
sl.cache.trackStaleness(ce)
}
sl.cache.addRef(mets, ref, lset, hash)
if sampleAdded && sampleLimitErr == nil {
seriesAdded++
}
} else if ce.ref != ref {
// Update the ref if it was invalidated.
ce.ref = ref
}

// Increment added even if there's an error so we correctly report the
Expand Down Expand Up @@ -1504,7 +1515,7 @@ func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err e
switch errors.Cause(err) {
case nil:
if tp == nil && ce != nil {
sl.cache.trackStaleness(ce.hash, ce.lset)
sl.cache.trackStaleness(ce)
}
return true, nil
case storage.ErrNotFound:
Expand Down Expand Up @@ -1630,7 +1641,6 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v
switch errors.Cause(err) {
case nil:
if !ok {
intern.InternLabels(intern.Global, lset)
sl.cache.addRef(s, ref, lset, lset.Hash())
}
return nil
Expand Down
Loading