Skip to content

Commit

Permalink
[manager/dispatcher] Use read-write lock for dispatcher context.
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
  • Loading branch information
anshulpundir committed Mar 30, 2018
1 parent 831df67 commit 5d7f8da
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ type Dispatcher struct {
// has finished initializing the dispatcher.
wg sync.WaitGroup
// This RWMutex synchronizes RPC handlers and the dispatcher stop().
// The RPC handlers use the read lock while stop() uses the write lock
// Used to serialize read-write access to the dispatcher context.
// Also, the RPC handlers use the read lock while stop() uses the write lock
// and acts as a barrier to shutdown.
rpcRW sync.RWMutex
nodes *nodeStore
Expand Down Expand Up @@ -265,12 +266,15 @@ func (d *Dispatcher) Run(ctx context.Context) error {
d.lastSeenManagers = getWeightedPeers(d.cluster)

defer cancel()
d.ctx, d.cancel = context.WithCancel(ctx)
ctx = d.ctx
d.wg.Add(1)
defer d.wg.Done()
d.mu.Unlock()

d.rpcRW.Lock()
d.ctx, d.cancel = context.WithCancel(ctx)
ctx = d.ctx
d.rpcRW.Unlock()

publishManagers := func(peers []*api.Peer) {
var mgrs []*api.WeightedPeer
for _, p := range peers {
Expand Down Expand Up @@ -333,14 +337,14 @@ func (d *Dispatcher) Stop() error {

log := log.G(d.ctx).WithField("method", "(*Dispatcher).Stop")
log.Info("dispatcher stopping")
d.cancel()
d.mu.Unlock()

// The active nodes list can be cleaned out only when all
// existing RPCs have finished.
// RPCs that start after rpcRW.Unlock() should find the context
// cancelled and should fail organically.
d.rpcRW.Lock()
d.cancel()
d.nodes.Clean()
d.downNodes.Clean()
d.rpcRW.Unlock()
Expand All @@ -364,14 +368,14 @@ func (d *Dispatcher) Stop() error {
return nil
}

func (d *Dispatcher) isRunningLocked() (context.Context, error) {
d.mu.Lock()
// context returns the dispatcher context.
func (d *Dispatcher) context() (context.Context, error) {
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()
if !d.isRunning() {
d.mu.Unlock()
return nil, status.Errorf(codes.Aborted, "dispatcher is stopped")
}
ctx := d.ctx
d.mu.Unlock()
return ctx, nil
}

Expand Down Expand Up @@ -510,7 +514,7 @@ func nodeIPFromContext(ctx context.Context) (string, error) {
func (d *Dispatcher) register(ctx context.Context, nodeID string, description *api.NodeDescription) (string, error) {
logLocal := log.G(ctx).WithField("method", "(*Dispatcher).register")
// prevent register until we're ready to accept it
dctx, err := d.isRunningLocked()
dctx, err := d.context()
if err != nil {
return "", err
}
Expand Down Expand Up @@ -565,7 +569,7 @@ func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStat
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
dctx, err := d.context()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -759,7 +763,7 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
dctx, err := d.context()
if err != nil {
return err
}
Expand Down Expand Up @@ -885,7 +889,7 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
dctx, err := d.context()
if err != nil {
return err
}
Expand Down Expand Up @@ -1080,7 +1084,7 @@ func (d *Dispatcher) moveTasksToOrphaned(nodeID string) error {
func (d *Dispatcher) markNodeNotReady(id string, state api.NodeStatus_State, message string) error {
logLocal := log.G(d.ctx).WithField("method", "(*Dispatcher).markNodeNotReady")

dctx, err := d.isRunningLocked()
dctx, err := d.context()
if err != nil {
return err
}
Expand Down Expand Up @@ -1190,7 +1194,7 @@ func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_Sessio
d.rpcRW.RLock()
defer d.rpcRW.RUnlock()

dctx, err := d.isRunningLocked()
dctx, err := d.context()
if err != nil {
return err
}
Expand Down

0 comments on commit 5d7f8da

Please sign in to comment.