-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bucket verify: repair out of order labels #964
Changes from 5 commits
f0dfe27
f09d3f5
230bc7b
3f0d6cd
ed78bde
2f179ff
24173d3
0d39253
bfd44f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,10 +85,11 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name | |
var backupBkt objstore.Bucket | ||
if len(backupconfContentYaml) == 0 { | ||
if *repair { | ||
return errors.Wrap(err, "repair is specified, so backup client is required") | ||
return errors.Errorf("repair is specified, so backup client is required") | ||
} | ||
} else { | ||
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, reg, name) | ||
// nil Prometheus registerer: don't create conflicting metrics | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a good solution. It's essentially as easy as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But also, not sure if it matters as it is only batch jobs, no one looks on metrics ;p There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, otherwise we register the same metrics twice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, you don't: |
||
backupBkt, err = client.NewBucket(logger, backupconfContentYaml, nil, name) | ||
if err != nil { | ||
return err | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -531,7 +531,7 @@ func IgnoreDuplicateOutsideChunk(_ int64, _ int64, last *chunks.Meta, curr *chun | |||||
// the current one. | ||||||
if curr.MinTime != last.MinTime || curr.MaxTime != last.MaxTime { | ||||||
return false, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]", | ||||||
last.MaxTime, last.MaxTime, curr.MinTime, curr.MaxTime) | ||||||
last.MinTime, last.MaxTime, curr.MinTime, curr.MaxTime) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wow! that was super confusing indeed, thanks for spotting! |
||||||
} | ||||||
ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli) | ||||||
cb := crc32.Checksum(curr.Chunk.Bytes(), castagnoli) | ||||||
|
@@ -559,9 +559,9 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk | |||||
var last *chunks.Meta | ||||||
|
||||||
OUTER: | ||||||
for _, c := range chks { | ||||||
for i := range chks { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very subtle. So we are remembering a pointer to the chunk Using just a slice index here allows us to correctly store a pointer to the item of the slice from the last iteration and compare that to the chunk in the current iteration. Otherwise, this code was removing all chunks in the series other than the first one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's document this :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is the right place to do so? Glad to do it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comments in the code. If that's not the best place, let me know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. amazing, nice catch! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More like, why did the repair just lose all the data in by blocks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very nice catch, the comment makes it clear when we read this in 3 months again :) 👍 |
||||||
for _, ignoreChkFn := range ignoreChkFns { | ||||||
ignore, err := ignoreChkFn(mint, maxt, last, &c) | ||||||
ignore, err := ignoreChkFn(mint, maxt, last, &chks[i]) | ||||||
if err != nil { | ||||||
return nil, errors.Wrap(err, "ignore function") | ||||||
} | ||||||
|
@@ -571,13 +571,18 @@ OUTER: | |||||
} | ||||||
} | ||||||
|
||||||
last = &c | ||||||
repl = append(repl, c) | ||||||
last = &chks[i] | ||||||
repl = append(repl, chks[i]) | ||||||
} | ||||||
|
||||||
return repl, nil | ||||||
} | ||||||
|
||||||
type seriesRepair struct { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not using just |
||||||
lset labels.Labels | ||||||
chks []chunks.Meta | ||||||
} | ||||||
|
||||||
// rewrite writes all data from the readers back into the writers while cleaning | ||||||
// up mis-ordered and duplicated chunks. | ||||||
func rewrite( | ||||||
|
@@ -605,17 +610,21 @@ func rewrite( | |||||
postings = index.NewMemPostings() | ||||||
values = map[string]stringset{} | ||||||
i = uint64(0) | ||||||
series = []seriesRepair{} | ||||||
) | ||||||
|
||||||
var lset labels.Labels | ||||||
var chks []chunks.Meta | ||||||
|
||||||
for all.Next() { | ||||||
var lset labels.Labels | ||||||
var chks []chunks.Meta | ||||||
id := all.At() | ||||||
|
||||||
if err := indexr.Series(id, &lset, &chks); err != nil { | ||||||
return err | ||||||
} | ||||||
// Make sure labels are in sorted order | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing trailing period for comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||||||
sort.Slice(lset, func(i, j int) bool { | ||||||
return lset[i].Name < lset[j].Name | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. labels.Labels already implements the sort.Interface https://github.com/prometheus/tsdb/blob/4b3a5ac5d36e5262d2656c8d149e137c2d1fab12/labels/labels.go#L39-L41 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Excellent. Thanks for that. I've updated the code to use |
||||||
}) | ||||||
for i, c := range chks { | ||||||
chks[i].Chunk, err = chunkr.Chunk(c.Ref) | ||||||
if err != nil { | ||||||
|
@@ -632,34 +641,48 @@ func rewrite( | |||||
continue | ||||||
} | ||||||
|
||||||
if err := chunkw.WriteChunks(chks...); err != nil { | ||||||
series = append(series, seriesRepair{ | ||||||
lset: lset, | ||||||
chks: chks, | ||||||
}) | ||||||
} | ||||||
|
||||||
if all.Err() != nil { | ||||||
return errors.Wrap(all.Err(), "iterate series") | ||||||
} | ||||||
|
||||||
// sort the series -- if labels moved around the ordering will be different | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's always keep comments a full sentence.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||||||
sort.Slice(series, func(i, j int) bool { | ||||||
return labels.Compare(series[i].lset, series[j].lset) < 0 | ||||||
}) | ||||||
|
||||||
// build new TSDB block | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wrong comment again (full sentence, please) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||||||
for _, s := range series { | ||||||
if err := chunkw.WriteChunks(s.chks...); err != nil { | ||||||
return errors.Wrap(err, "write chunks") | ||||||
} | ||||||
if err := indexw.AddSeries(i, lset, chks...); err != nil { | ||||||
if err := indexw.AddSeries(i, s.lset, s.chks...); err != nil { | ||||||
return errors.Wrap(err, "add series") | ||||||
} | ||||||
|
||||||
meta.Stats.NumChunks += uint64(len(chks)) | ||||||
meta.Stats.NumChunks += uint64(len(s.chks)) | ||||||
meta.Stats.NumSeries++ | ||||||
|
||||||
for _, chk := range chks { | ||||||
for _, chk := range s.chks { | ||||||
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) | ||||||
} | ||||||
|
||||||
for _, l := range lset { | ||||||
for _, l := range s.lset { | ||||||
valset, ok := values[l.Name] | ||||||
if !ok { | ||||||
valset = stringset{} | ||||||
values[l.Name] = valset | ||||||
} | ||||||
valset.set(l.Value) | ||||||
} | ||||||
postings.Add(i, lset) | ||||||
postings.Add(i, s.lset) | ||||||
i++ | ||||||
} | ||||||
if all.Err() != nil { | ||||||
return errors.Wrap(all.Err(), "iterate series") | ||||||
} | ||||||
|
||||||
s := make([]string, 0, 256) | ||||||
for n, v := range values { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: some linters throw errors on things like this, I prefer
errors.New
hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.