Skip to content

Commit

Permalink
Refactor context cancellation handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ironcladlou committed Nov 18, 2020
1 parent 47fb0b9 commit f266e2f
Showing 1 changed file with 27 additions and 21 deletions.
48 changes: 27 additions & 21 deletions etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNoLeader
}

cancelCtx, cancelFn := context.WithCancel(ss.Context())
monitorCtx := &leaderMonitoringContext{
Context: cancelCtx,
cancel: cancelFn,
}
cancelForLeaderLoss := context.CancelFunc(monitorCtx.CancelForLeaderLoss)
ss = serverStreamWithCtx{ctx: monitorCtx, cancel: &cancelForLeaderLoss, ServerStream: ss}
ctx := newCancellableContext(ss.Context())
ss = serverStreamWithCtx{ctx: newCancellableContext(ss.Context()), ServerStream: ss}

smap.mu.Lock()
smap.streams[ss] = struct{}{}
Expand All @@ -248,7 +243,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
monitorCtx.Cancel()
// TODO: investigate whether the reason for cancellation here is useful to know
ctx.Cancel(nil)
}()
}
}
Expand All @@ -257,30 +253,39 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}

// leaderMonitoringContext wraps a context and provides a custom error when
// the CancelForLeaderLoss() method is used to cancel the context. This is
// so downstream context users can disambiguate the reason for the cancellation
// which could be from the client (for example) or from this interceptor code.
type leaderMonitoringContext struct {
// cancellableContext wraps a context with new cancellable context that allows a
// specific cancellation error to be preserved and later retrieved using the
// Context.Err() function. This is so downstream context users can disambiguate
// the reason for the cancellation which could be from the client (for example)
// or from this interceptor code.
type cancellableContext struct {
context.Context

lock sync.RWMutex
cancel context.CancelFunc
cancelReason error
}

func (c *leaderMonitoringContext) Cancel() {
c.cancel()
func newCancellableContext(parent context.Context) *cancellableContext {
ctx, cancel := context.WithCancel(parent)
return &cancellableContext{
Context: ctx,
cancel: cancel,
}
}

func (c *leaderMonitoringContext) CancelForLeaderLoss() {
// Cancel stores the cancellation reason and then delegates to context.WithCancel
// against the parent context.
func (c *cancellableContext) Cancel(reason error) {
c.lock.Lock()
c.cancelReason = rpctypes.ErrGRPCNoLeader
c.cancelReason = reason
c.lock.Unlock()
c.cancel()
}

func (c *leaderMonitoringContext) Err() error {
// Err will return the preserved cancel reason error if present, and will
// otherwise return the underlying error from the parent context.
func (c *cancellableContext) Err() error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.cancelReason != nil {
Expand All @@ -291,8 +296,9 @@ func (c *leaderMonitoringContext) Err() error {

type serverStreamWithCtx struct {
grpc.ServerStream
ctx context.Context
cancel *context.CancelFunc

// ctx is used so that we can preserve a reason for cancellation.
ctx *cancellableContext
}

func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
Expand Down Expand Up @@ -324,7 +330,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
smap.mu.Lock()
for ss := range smap.streams {
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
(*ssWithCtx.cancel)()
ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
<-ss.Context().Done()
}
}
Expand Down

0 comments on commit f266e2f

Please sign in to comment.