Skip to content

Commit

Permalink
Fetcher: Fix data races (#4663)
Browse files Browse the repository at this point in the history
* Fix data race for cached map

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Fix data race for ReplicaLabelRemover

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Fix data race for IgnoreDeletionMarkFilter

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Update CHANGELOG.md

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Newline

Signed-off-by: Matej Gera <matejgera@gmail.com>

* Improve deletionMarkMap

Signed-off-by: Matej Gera <matejgera@gmail.com>
  • Loading branch information
matej-g authored Sep 21, 2021
1 parent 0c27120 commit b77b8ee
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Fixed

- [#4663](https://github.com/thanos-io/thanos/pull/4663) Fetcher: Fix discovered data races

### Added

- [#4680](https://github.com/thanos-io/thanos/pull/4680) Query: add `exemplar.partial-response` flag to control partial response.

## v0.23.0 - In Progress
Expand Down
55 changes: 44 additions & 11 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ type BaseFetcher struct {

// Optional local directory to cache meta.json files.
cacheDir string
cached map[ulid.ULID]*metadata.Meta
syncs prometheus.Counter
g singleflight.Group

mtx sync.Mutex
cached map[ulid.ULID]*metadata.Meta
}

// NewBaseFetcher constructs BaseFetcher.
Expand Down Expand Up @@ -386,7 +388,10 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
for id, m := range resp.metas {
cached[id] = m
}

f.mtx.Lock()
f.cached = cached
f.mtx.Unlock()

// Best effort cleanup of disk-cached metas.
if f.cacheDir != "" {
Expand Down Expand Up @@ -473,10 +478,17 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter
return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view")
}

level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial))
level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", f.countCached(), "returned", len(metas), "partial", len(resp.partial))
return metas, resp.partial, nil
}

func (f *BaseFetcher) countCached() int {
f.mtx.Lock()
defer f.mtx.Unlock()

return len(f.cached)
}

type MetaFetcher struct {
wrapped *BaseFetcher
metrics *FetcherMetrics
Expand Down Expand Up @@ -711,7 +723,11 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met
}

for u, meta := range metas {
l := meta.Thanos.Labels
l := make(map[string]string)
for n, v := range meta.Thanos.Labels {
l[n] = v
}

for _, replicaLabel := range r.replicaLabels {
if _, exists := l[replicaLabel]; exists {
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel)
Expand All @@ -723,7 +739,10 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met
level.Warn(r.logger).Log("msg", "block has no labels left, creating one", r.replicaLabels[0], "deduped")
l[r.replicaLabels[0]] = "deduped"
}
metas[u].Thanos.Labels = l

nm := *meta
nm.Thanos.Labels = l
metas[u] = &nm
}
return nil
}
Expand Down Expand Up @@ -778,10 +797,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
// Delay is not considered when computing DeletionMarkBlocks map.
// Not go-routine safe.
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
concurrency int
bkt objstore.InstrumentedBucketReader
logger log.Logger
delay time.Duration
concurrency int
bkt objstore.InstrumentedBucketReader

mtx sync.Mutex
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}

Expand All @@ -797,13 +818,21 @@ func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBuc

// DeletionMarkBlocks returns block ids that were marked for deletion.
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark {
return f.deletionMarkMap
f.mtx.Lock()
defer f.mtx.Unlock()

deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark, len(f.deletionMarkMap))
for id, meta := range f.deletionMarkMap {
deletionMarkMap[id] = meta
}

return deletionMarkMap
}

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)
deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark)

// Make a copy of block IDs to check, in order to avoid concurrency issues
// between the scheduler and workers.
Expand Down Expand Up @@ -839,7 +868,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
// Keep track of the blocks marked for deletion and filter them out if their
// deletion time is greater than the configured delay.
mtx.Lock()
f.deletionMarkMap[id] = m
deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(MarkedForDeletionMeta).Inc()
delete(metas, id)
Expand Down Expand Up @@ -871,6 +900,10 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
return errors.Wrap(err, "filter blocks marked for deletion")
}

f.mtx.Lock()
f.deletionMarkMap = deletionMarkMap
f.mtx.Unlock()

return nil
}

Expand Down

0 comments on commit b77b8ee

Please sign in to comment.