-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fetcher: Fix data races #4663
Fetcher: Fix data races #4663
Changes from 4 commits
25fd24b
9b13590
6002246
817fc26
0392dfe
4aeeae1
e2cac6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -162,9 +162,11 @@ type BaseFetcher struct { | |||||
|
||||||
// Optional local directory to cache meta.json files. | ||||||
cacheDir string | ||||||
cached map[ulid.ULID]*metadata.Meta | ||||||
syncs prometheus.Counter | ||||||
g singleflight.Group | ||||||
|
||||||
mtx sync.Mutex | ||||||
cached map[ulid.ULID]*metadata.Meta | ||||||
} | ||||||
|
||||||
// NewBaseFetcher constructs BaseFetcher. | ||||||
|
@@ -386,7 +388,10 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { | |||||
for id, m := range resp.metas { | ||||||
cached[id] = m | ||||||
} | ||||||
|
||||||
f.mtx.Lock() | ||||||
f.cached = cached | ||||||
f.mtx.Unlock() | ||||||
|
||||||
// Best effort cleanup of disk-cached metas. | ||||||
if f.cacheDir != "" { | ||||||
|
@@ -473,10 +478,17 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filter | |||||
return metas, resp.partial, errors.Wrap(resp.metaErrs.Err(), "incomplete view") | ||||||
} | ||||||
|
||||||
level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", len(f.cached), "returned", len(metas), "partial", len(resp.partial)) | ||||||
level.Info(f.logger).Log("msg", "successfully synchronized block metadata", "duration", time.Since(start).String(), "duration_ms", time.Since(start).Milliseconds(), "cached", f.countCached(), "returned", len(metas), "partial", len(resp.partial)) | ||||||
return metas, resp.partial, nil | ||||||
} | ||||||
|
||||||
func (f *BaseFetcher) countCached() int { | ||||||
f.mtx.Lock() | ||||||
defer f.mtx.Unlock() | ||||||
|
||||||
return len(f.cached) | ||||||
} | ||||||
|
||||||
type MetaFetcher struct { | ||||||
wrapped *BaseFetcher | ||||||
metrics *FetcherMetrics | ||||||
|
@@ -711,7 +723,11 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met | |||||
} | ||||||
|
||||||
for u, meta := range metas { | ||||||
l := meta.Thanos.Labels | ||||||
l := make(map[string]string) | ||||||
for n, v := range meta.Thanos.Labels { | ||||||
l[n] = v | ||||||
} | ||||||
|
||||||
for _, replicaLabel := range r.replicaLabels { | ||||||
if _, exists := l[replicaLabel]; exists { | ||||||
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel) | ||||||
|
@@ -723,7 +739,10 @@ func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*met | |||||
level.Warn(r.logger).Log("msg", "block has no labels left, creating one", r.replicaLabels[0], "deduped") | ||||||
l[r.replicaLabels[0]] = "deduped" | ||||||
} | ||||||
metas[u].Thanos.Labels = l | ||||||
|
||||||
nm := *meta | ||||||
nm.Thanos.Labels = l | ||||||
metas[u] = &nm | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
@@ -778,10 +797,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL | |||||
// Delay is not considered when computing DeletionMarkBlocks map. | ||||||
// Not go-routine safe. | ||||||
type IgnoreDeletionMarkFilter struct { | ||||||
logger log.Logger | ||||||
delay time.Duration | ||||||
concurrency int | ||||||
bkt objstore.InstrumentedBucketReader | ||||||
logger log.Logger | ||||||
delay time.Duration | ||||||
concurrency int | ||||||
bkt objstore.InstrumentedBucketReader | ||||||
|
||||||
mtx sync.Mutex | ||||||
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark | ||||||
} | ||||||
|
||||||
|
@@ -797,13 +818,23 @@ func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBuc | |||||
|
||||||
// DeletionMarkBlocks returns block ids that were marked for deletion. | ||||||
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark { | ||||||
return f.deletionMarkMap | ||||||
f.mtx.Lock() | ||||||
defer f.mtx.Unlock() | ||||||
|
||||||
deletionMarkMap := make(map[ulid.ULID]*metadata.DeletionMark) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍🏽 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
for id, meta := range f.deletionMarkMap { | ||||||
deletionMarkMap[id] = meta | ||||||
} | ||||||
|
||||||
return deletionMarkMap | ||||||
} | ||||||
|
||||||
// Filter filters out blocks that are marked for deletion after a given delay. | ||||||
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time. | ||||||
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { | ||||||
f.mtx.Lock() | ||||||
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is that needed, do we access this map somewhere else? If that's the case I think we have to build this map locally and only then swap in locked fashion, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise we will have "partial" results There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's being accessed in Otherwise you're right, we can also build it separately and swap afterwards while using lock. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am just assuming there is a reader somewhere else accessing the deletion map concurrently. For a short period instead of 20000 deleted items it would have zero elements and the back to 20000. I would need to check what exactly wrong can happen with this, but it is just asking for problems here, no? (: Maybe now, things would eventually heal, but tomorrow no. No one would assume that internal map is flaky unless it's called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would fix this before moving on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for clarifying, I see the point now 👍 I adjusted it, please let me know if it looks good now! |
||||||
f.mtx.Unlock() | ||||||
|
||||||
// Make a copy of block IDs to check, in order to avoid concurrency issues | ||||||
// between the scheduler and workers. | ||||||
|
@@ -813,9 +844,8 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL | |||||
} | ||||||
|
||||||
var ( | ||||||
eg errgroup.Group | ||||||
ch = make(chan ulid.ULID, f.concurrency) | ||||||
mtx sync.Mutex | ||||||
eg errgroup.Group | ||||||
ch = make(chan ulid.ULID, f.concurrency) | ||||||
) | ||||||
|
||||||
for i := 0; i < f.concurrency; i++ { | ||||||
|
@@ -838,13 +868,13 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL | |||||
|
||||||
// Keep track of the blocks marked for deletion and filter them out if their | ||||||
// deletion time is greater than the configured delay. | ||||||
mtx.Lock() | ||||||
f.mtx.Lock() | ||||||
f.deletionMarkMap[id] = m | ||||||
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() { | ||||||
synced.WithLabelValues(MarkedForDeletionMeta).Inc() | ||||||
delete(metas, id) | ||||||
} | ||||||
mtx.Unlock() | ||||||
f.mtx.Unlock() | ||||||
} | ||||||
|
||||||
return lastErr | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deep copy! 🤗