Skip to content

Commit

Permalink
Make inmemPipeline check that it's not closed before sending RPCs
Browse files Browse the repository at this point in the history
This change fixes a race inside between the shutdownCh and the
consumerCh in the select statement of inmempipeLine.AppendEntries,
which could otherwise forward an RPC request to the consumer even if
the transport was closed with InmemTransport.Close().

See also #275 for a sample program that reproduces the race if ran
long enough. The same program doesn't fail anymore with this change
applied.
  • Loading branch information
freeekanayaka committed Feb 12, 2018
1 parent 7e9c478 commit e290521
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion inmem_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type inmemPipeline struct {

shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdownLock sync.RWMutex
}

type inmemPipelineInflight struct {
Expand Down Expand Up @@ -288,6 +288,17 @@ func (i *inmemPipeline) AppendEntries(args *AppendEntriesRequest, resp *AppendEn
Command: args,
RespChan: respCh,
}

// Check if we have been already shutdown, otherwise the random choose
// made by select statement below might pick consumerCh even if
// shutdownCh was closed.
i.shutdownLock.RLock()
shutdown := i.shutdown
i.shutdownLock.RUnlock()
if shutdown {
return nil, ErrPipelineShutdown
}

select {
case i.peer.consumerCh <- rpc:
case <-timeout:
Expand Down

0 comments on commit e290521

Please sign in to comment.