Skip to content

Commit

Permalink
Rename WrapApply to Apply.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptabor committed May 13, 2022
1 parent f280447 commit f28bfa4
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 18 deletions.
11 changes: 6 additions & 5 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type RaftStatusGetter interface {
Term() uint64
}

type ApplyResult struct {
type Result struct {
Resp proto.Message
Err error
// Physc signals the physical effect of the request has completed in addition
Expand All @@ -62,12 +62,13 @@ type ApplyResult struct {
Trace *traceutil.Trace
}

type ApplyFunc func(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult
type applyFunc func(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result

// applierV3 is the interface for processing V3 raft messages
type applierV3 interface {
WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *ApplyResult
//Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult
// Apply executes the generic portion of application logic for the current applier, but
// delegates the actual execution to the applyFunc method.
Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result

Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
Expand Down Expand Up @@ -151,7 +152,7 @@ func newApplierV3Backend(
txnModeWriteWithSharedBuffer: txnModeWriteWithSharedBuffer}
}

func (a *applierV3backend) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *ApplyResult {
func (a *applierV3backend) Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
return applyFunc(ctx, r, shouldApplyV3)
}

Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/apply/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3, lessor lease.Lessor) *a
return &authApplierV3{applierV3: base, as: as, lessor: lessor}
}

func (aa *authApplierV3) WrapApply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc ApplyFunc) *ApplyResult {
func (aa *authApplierV3) Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result {
aa.mu.Lock()
defer aa.mu.Unlock()
if r.Header != nil {
Expand All @@ -56,10 +56,10 @@ func (aa *authApplierV3) WrapApply(ctx context.Context, r *pb.InternalRaftReques
if err := aa.as.IsAdminPermitted(&aa.authInfo); err != nil {
aa.authInfo.Username = ""
aa.authInfo.Revision = 0
return &ApplyResult{Err: err}
return &Result{Err: err}
}
}
ret := aa.applierV3.WrapApply(ctx, r, shouldApplyV3, applyFunc)
ret := aa.applierV3.Apply(ctx, r, shouldApplyV3, applyFunc)
aa.authInfo.Username = ""
aa.authInfo.Revision = 0
return ret
Expand Down
12 changes: 6 additions & 6 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

type UberApplier interface {
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult
Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result
}

type uberApplier struct {
Expand Down Expand Up @@ -107,20 +107,20 @@ func (a *uberApplier) restoreAlarms() {
}
}

func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult {
// We first execute chain of WrapApply() calls down the hierarchy:
func (a *uberApplier) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
// We first execute chain of Apply() calls down the hierarchy:
// (i.e. CorruptApplier -> CappedApplier -> Auth -> Quota -> Backend),
// then dispatch() unpacks the request to a specific method (like Put),
// that gets executed down the hierarchy again:
// i.e. CorruptApplier.Put(CappedApplier.Put(...(BackendApplier.Put(...)))).
return a.applyV3.WrapApply(context.TODO(), r, shouldApplyV3, a.dispatch)
return a.applyV3.Apply(context.TODO(), r, shouldApplyV3, a.dispatch)
}

// dispatch translates the request (r) into appropriate call (like Put) on
// the underlying applyV3 object.
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *ApplyResult {
func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *Result {
op := "unknown"
ar := &ApplyResult{}
ar := &Result{}
defer func(start time.Time) {
success := ar.Err == nil || ar.Err == mvcc.ErrCompacted
txn.ApplySecObserve(v3Version, op, success, time.Since(start))
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1829,7 +1829,7 @@ func (s *EtcdServer) apply(
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
applyV3Performed := false
var ar *apply.ApplyResult
var ar *apply.Result
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ func TestPublishV3(t *testing.T) {
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- &apply2.ApplyResult{}
ch <- &apply2.Result{}
w := wait.NewWithResponse(ch)
ctx, cancel := context.WithCancel(context.Background())
lg := zaptest.NewLogger(t)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e
return nil
}

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.ApplyResult, error) {
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*apply2.Result, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
Expand Down Expand Up @@ -709,7 +709,7 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In

select {
case x := <-ch:
return x.(*apply2.ApplyResult), nil
return x.(*apply2.Result), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
Expand Down

0 comments on commit f28bfa4

Please sign in to comment.