Skip to content

Commit

Permalink
Merge pull request #1855 from aaronlehmann/revert-ca-server-changes
Browse files Browse the repository at this point in the history
Revert recent CA server changes
  • Loading branch information
aaronlehmann authored Jan 10, 2017
2 parents 69ea950 + 2eeaea9 commit 7dfafd8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
29 changes: 16 additions & 13 deletions ca/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Server struct {
// renewal. They are indexed by node ID.
pending map[string]*api.Node

// started is a channel which gets closed once the server is running
// Started is a channel which gets closed once the server is running
// and able to service RPCs.
started chan struct{}
}
Expand Down Expand Up @@ -102,9 +102,10 @@ func (s *Server) NodeCertificateStatus(ctx context.Context, request *api.NodeCer
return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
}

if err := s.isRunningLocked(); err != nil {
if err := s.addTask(); err != nil {
return nil, err
}
defer s.doneTask()

var node *api.Node

Expand Down Expand Up @@ -188,9 +189,10 @@ func (s *Server) IssueNodeCertificate(ctx context.Context, request *api.IssueNod
return nil, grpc.Errorf(codes.InvalidArgument, codes.InvalidArgument.String())
}

if err := s.isRunningLocked(); err != nil {
if err := s.addTask(); err != nil {
return nil, err
}
defer s.doneTask()

var (
blacklistedCerts map[string]*api.BlacklistedCertificate
Expand Down Expand Up @@ -405,8 +407,8 @@ func (s *Server) Run(ctx context.Context) error {
// returns true without joinTokens being set correctly.
s.mu.Lock()
s.ctx, s.cancel = context.WithCancel(ctx)
close(s.started)
s.mu.Unlock()
close(s.started)

if err != nil {
log.G(ctx).WithFields(logrus.Fields{
Expand Down Expand Up @@ -467,37 +469,38 @@ func (s *Server) Run(ctx context.Context) error {
// Stop stops the CA and closes all grpc streams.
func (s *Server) Stop() error {
s.mu.Lock()

// Wait for Run to complete before returning
defer s.wg.Wait()

defer s.mu.Unlock()

if !s.isRunning() {
s.mu.Unlock()
return errors.New("CA signer is already stopped")
}
s.cancel()
s.mu.Unlock()
// wait for all handlers to finish their CA deals,
s.wg.Wait()
s.started = make(chan struct{})
return nil
}

// Ready waits on the ready channel and returns when the server is ready to serve.
func (s *Server) Ready() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
return s.started
}

func (s *Server) isRunningLocked() error {
func (s *Server) addTask() error {
s.mu.Lock()
if !s.isRunning() {
s.mu.Unlock()
return grpc.Errorf(codes.Aborted, "CA signer is stopped")
}
s.wg.Add(1)
s.mu.Unlock()
return nil
}

func (s *Server) doneTask() {
s.wg.Done()
}

func (s *Server) isRunning() bool {
if s.ctx == nil {
return false
Expand Down
1 change: 1 addition & 0 deletions manager/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type Dispatcher struct {
}

// New returns Dispatcher with cluster interface(usually raft.Node).
// NOTE: each handler which does something with raft must add to Dispatcher.wg
func New(cluster Cluster, c *Config) *Dispatcher {
d := &Dispatcher{
nodes: newNodeStore(c.HeartbeatPeriod, c.HeartbeatEpsilon, c.GracePeriodMultiplier, c.RateLimitPeriod),
Expand Down

0 comments on commit 7dfafd8

Please sign in to comment.