Skip to content

Commit

Permalink
fix(v2): retry compaction polling requests on Raft leader errors (#3562)
Browse files Browse the repository at this point in the history
* Retry poll compaction job requests as well

* Move error checking elsewhere

* Move retryable error wrapping to applyCommand
  • Loading branch information
aleks-p authored Sep 19, 2024
1 parent b60259a commit 39981cb
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
25 changes: 24 additions & 1 deletion pkg/experiment/metastore/metastore_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/raft"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb"
"github.com/grafana/pyroscope/pkg/util"
)
Expand Down Expand Up @@ -205,7 +208,8 @@ func applyCommand[Req, Resp proto.Message](
}
future = log.Apply(raw, timeout)
if err = future.Error(); err != nil {
return nil, resp, err
// todo (korniltsev) write a test to spawn multiple metastores and verify this error returned with correct details
return nil, resp, wrapRetryableErrorWithRaftDetails(err, log)
}
fsmResp := future.Response().(fsmResponse)
if fsmResp.msg != nil {
Expand All @@ -231,3 +235,22 @@ func marshallRequest[Req proto.Message](req Req) ([]byte, error) {
}
return raw, nil
}

func wrapRetryableErrorWithRaftDetails(err error, raft *raft.Raft) error {
if err == nil || !shouldRetryCommand(err) {
return err
}
_, serverID := raft.LeaderWithID()
s := status.New(codes.Unavailable, err.Error())
if serverID != "" {
s, _ = s.WithDetails(&typesv1.RaftDetails{Leader: string(serverID)})
}
return s.Err()
}

func shouldRetryCommand(err error) bool {
return errors.Is(err, raft.ErrLeadershipLost) ||
errors.Is(err, raft.ErrNotLeader) ||
errors.Is(err, raft.ErrLeadershipTransferInProgress) ||
errors.Is(err, raft.ErrRaftShutdown)
}
24 changes: 0 additions & 24 deletions pkg/experiment/metastore/metastore_state_add_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package metastore

import (
"context"
"errors"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"time"

"github.com/go-kit/log/level"
Expand All @@ -32,30 +28,10 @@ func (m *Metastore) AddBlock(_ context.Context, req *metastorev1.AddBlockRequest
_, resp, err := applyCommand[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse](m.raft, req, m.config.Raft.ApplyTimeout)
if err != nil {
_ = level.Error(m.logger).Log("msg", "failed to apply add block", "block_id", req.Block.Id, "shard", req.Block.Shard, "err", err)
if m.shouldRetryAddBlock(err) {
// todo (korniltsev) write a test to spawn multiple metastores and verify this error returned with correct details
return nil, m.retryableErrorWithRaftDetails(err)
}
}
return resp, err
}

func (m *Metastore) retryableErrorWithRaftDetails(err error) error {
_, serverID := m.raft.LeaderWithID()
s := status.New(codes.Unavailable, err.Error())
if serverID != "" {
s, _ = s.WithDetails(&typesv1.RaftDetails{Leader: string(serverID)})
}
return s.Err()
}

func (m *Metastore) shouldRetryAddBlock(err error) bool {
return errors.Is(err, raft.ErrLeadershipLost) ||
errors.Is(err, raft.ErrNotLeader) ||
errors.Is(err, raft.ErrLeadershipTransferInProgress) ||
errors.Is(err, raft.ErrRaftShutdown)
}

func (m *metastoreState) applyAddBlock(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) {
name, key := keyForBlockMeta(request.Block.Shard, "", request.Block.Id)
value, err := request.Block.MarshalVT()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func (m *Metastore) PollCompactionJobs(_ context.Context, req *compactorv1.PollC
"raft_last_index", m.raft.LastIndex(),
"raft_applied_index", m.raft.AppliedIndex())
_, resp, err := applyCommand[*compactorv1.PollCompactionJobsRequest, *compactorv1.PollCompactionJobsResponse](m.raft, req, m.config.Raft.ApplyTimeout)
if err != nil {
_ = level.Error(m.logger).Log("msg", "failed to apply poll compaction jobs", "raft_commit_index", m.raft.CommitIndex(), "err", err)
}
return resp, err
}

Expand Down

0 comments on commit 39981cb

Please sign in to comment.