Skip to content

Commit

Permalink
improve consistent read guarantees
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 15, 2024
1 parent 329a84f commit cb0f511
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (m *Metastore) TransferLeadership() (err error) {
// CheckReady verifies if the metastore is ready to serve requests by
// ensuring the node is up-to-date with the leader's commit index.
func (m *Metastore) CheckReady(ctx context.Context) error {
if err := m.follower.WaitLeaderCommitIndexAppliedLocally(ctx); err != nil {
if _, err := m.follower.WaitLeaderCommitIndexAppliedLocally(ctx); err != nil {
return err
}
m.readyOnce.Do(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (svc *MetadataQueryService) QueryMetadata(
resp, err = svc.listBlocksForQuery(ctx, tx, req)
}
if readErr := svc.follower.ConsistentRead(ctx, read); readErr != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Unavailable, readErr.Error())
}
return resp, err
}
Expand Down
64 changes: 50 additions & 14 deletions pkg/experiment/metastore/raft_node/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
)

var ErrConsistentRead = fmt.Errorf("consistent read failed")
var (
ErrConsistentRead = fmt.Errorf("consistent read failed")
ErrorAbortedByRestore = fmt.Errorf("aborted by restore")
)

type Follower struct {
client metastorev1.RaftNodeServiceClient
Expand All @@ -34,39 +37,72 @@ func NewFollower(client metastorev1.RaftNodeServiceClient, raft RaftNode, fsm FS
//
// The transaction passed to the provided function has access to the most up-to-date
// data, reflecting the updates from all prior write operations that were successful.
func (f *Follower) ConsistentRead(ctx context.Context, fn func(*bbolt.Tx)) (err error) {
if err = f.WaitLeaderCommitIndexAppliedLocally(ctx); err == nil {
err = f.fsm.Read(fn)
//
// If the function returns an error, it's guaranteed that the state has not been
// accessed. These errors can and should be retried on another follower.
//
// It's caller's responsibility to handle errors encountered while using the
// provided transaction, such as I/O errors or logical inconsistencies.
func (f *Follower) ConsistentRead(ctx context.Context, fn func(*bbolt.Tx)) error {
if err := f.consistentRead(ctx, fn); err != nil {
return fmt.Errorf("%w: %w", ErrConsistentRead, err)
}
return nil
}

func (f *Follower) consistentRead(ctx context.Context, fn func(*bbolt.Tx)) error {
applied, err := f.WaitLeaderCommitIndexAppliedLocally(ctx)
if err != nil {
return fmt.Errorf("%w: %w", ErrConsistentRead, err)
return err
}
return err
var readErr error
read := func(tx *bbolt.Tx) {
// Now that we've acquired access to the state after catch up with
// the leader, we can perform the read operation. However, there's a
// possibility that the FSM has been restored from a snapshot right
// after the index check and before the transaction begins. We perform
// the check again to detect this.
if f.raft.AppliedIndex() < applied {
readErr = ErrorAbortedByRestore
return
}
// It's guaranteed that the FSM has the most up-to-date state
// relative to the read time: any subsequent read will include
// the state we're accessing now.
fn(tx)
}
if err = f.fsm.Read(read); err != nil {
// The FSM might not be able to perform the read operation due to the
// underlying storage issues. In this case, we return the error before
// providing the transaction handle to the caller.
return err
}
return readErr
}

// WaitLeaderCommitIndexAppliedLocally ensures the node is up-to-date for read
// operations, providing linearizable read semantics. It calls metastore client
// ReadIndex and waits for the local applied index to catch up to the returned
// read index. This method should be used before performing local reads to ensure
// consistency.
func (f *Follower) WaitLeaderCommitIndexAppliedLocally(ctx context.Context) error {
func (f *Follower) WaitLeaderCommitIndexAppliedLocally(ctx context.Context) (uint64, error) {
r, err := f.client.ReadIndex(ctx, &metastorev1.ReadIndexRequest{})
if err != nil {
return err
return 0, err
}
if f.raft.AppliedIndex() >= r.ReadIndex {
return nil
if applied := f.raft.AppliedIndex(); applied >= r.ReadIndex {
return applied, nil
}
t := time.NewTicker(10 * time.Millisecond)
defer t.Stop()
for {
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-t.C:
if f.raft.AppliedIndex() >= r.ReadIndex {
return nil
if applied := f.raft.AppliedIndex(); applied >= r.ReadIndex {
return applied, nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
2 changes: 1 addition & 1 deletion pkg/experiment/metastore/tenant_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (svc *TenantService) GetTenant(
resp, err = svc.getTenantStats(req.TenantId, ctx)
}
if readErr := svc.follower.ConsistentRead(ctx, read); readErr != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Unavailable, readErr.Error())
}
return resp, err
}
Expand Down

0 comments on commit cb0f511

Please sign in to comment.