-
Notifications
You must be signed in to change notification settings - Fork 621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(v2): dlq recovery #3595
feat(v2): dlq recovery #3595
Conversation
func (m *Metastore) stopping(_ error) error { | ||
close(m.done) | ||
m.wg.Wait() | ||
m.dlq.Stop() | ||
return m.Shutdown() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop is called twice: here and in m.Shutdown
, where I believe it should live. Let's remove if from here
NB: I guess we should stop DLQ before raft just to minimize chances of concurrent recovery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 applied
const pathAnon = tenant.DefaultTenantID | ||
const pathBlock = "block.bin" | ||
const pathMetaPB = "meta.pb" | ||
const pathDLQ = "dlq" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's find a place for the constants (we already have them defined in other places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created a separate tiny package
func (r *Recovery) recoverLoop(ctx context.Context) { | ||
ticker := time.NewTicker(r.cfg.Period) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
r.recoverTick(ctx) | ||
} | ||
} | ||
} | ||
|
||
func (r *Recovery) recoverTick(ctx context.Context) { | ||
err := r.bucket.Iter(ctx, pathDLQ, func(metaPath string) error { | ||
if ctx.Err() != nil { | ||
return ctx.Err() | ||
} | ||
r.recover(ctx, metaPath) | ||
return nil | ||
}, objstore.WithRecursiveIter) | ||
if err != nil { | ||
level.Error(r.l).Log("msg", "failed to iterate over dlq", "err", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: One danger I see is that we mix old and new blocks in compaction jobs, which may increase the read amplification factor. I think we need to implement some logic to prevent this – just one more rule for the compaction planner. It’s not really critical, as we don't expect this to happen frequently.
/cc @aleks-p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add s little more context here.
Basically, when we try to add the same md entry twice there are three cases:
- We still hold the entry in memory in the metastore. That's simple: we just reject it.
- We've already compacted the block and we still have tombstone for it. That's simple: we just reject it.
- We've already compacted the block and removed it from object storage and don't have tombstone for it. Somewhat tricky – I propose the following:
- Handle the md entry as usual: add it to the index, add it to the compaction job, create a tombstone afterwards, etc. The entry is visible to readers.
- Ignore "object not found" in compactors and queriers (reflect it in logs and metrics). Clearly, even if the object is missing due to any other reason, it's probably the best strategy. Better than failing the whole query/job at last.
- Ensure that we only remove tombstone if the object is actually deleted from object storage.
79e57d2
to
31b3aab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
func isRaftLeadershipError(err error) bool { | ||
return errors.Is(err, raft.ErrLeadershipLost) || | ||
errors.Is(err, raft.ErrNotLeader) || | ||
errors.Is(err, raft.ErrLeadershipTransferInProgress) || | ||
errors.Is(err, raft.ErrRaftShutdown) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could probably refactor out this function to the raftleader
package
pyroscope/pkg/experiment/metastore/metastore_fsm.go
Lines 251 to 256 in 363d453
func shouldRetryCommand(err error) bool { | |
return errors.Is(err, raft.ErrLeadershipLost) || | |
errors.Is(err, raft.ErrNotLeader) || | |
errors.Is(err, raft.ErrLeadershipTransferInProgress) || | |
errors.Is(err, raft.ErrRaftShutdown) | |
} |
@@ -294,7 +294,7 @@ func assertChanReceived(t *testing.T, c chan struct{}, timeout time.Duration, ms | |||
select { | |||
case <-c: | |||
case <-time.After(timeout): | |||
t.Fatalf(msg) | |||
t.Fatalf("%s", msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'm curious if a simple t.Fatal(msg)
would do the trick
Co-authored-by: Anton Kolesnikov <anton.e.kolesnikov@gmail.com>
No description provided.