Skip to content

Commit

Permalink
Merge pull request #1739 from ph/mutex-coordinator
Browse files Browse the repository at this point in the history
Protect access to policiesCanceller using a mutext
  • Loading branch information
ph authored Aug 15, 2022
2 parents 7f4576c + b56d37b commit 181295d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- Return a better error on enrolling and the Elasticsearch version is incompatible. {pull}1211[1211]
- Give a grace period when starting the unenroll monitor. {issue}1500[1500]
- Fixes a race condition between the unenroller goroutine and the main goroutine for the coordinator monitor. {issues}1738[1738]
==== New Features

Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/bulk/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ func TestCancelCtx(t *testing.T) {
},
}

_ = testlog.SetLogger(t)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_ = testlog.SetLogger(t)
ctx, cancelF := context.WithCancel(context.Background())

var wg sync.WaitGroup
Expand Down
19 changes: 17 additions & 2 deletions internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ type monitorT struct {
leadersIndex string
agentsIndex string

policies map[string]policyT
policiesCanceller map[string]context.CancelFunc
policies map[string]policyT

muPoliciesCanceller sync.Mutex
policiesCanceller map[string]context.CancelFunc
}

// NewMonitor creates a new coordinator policy monitor.
Expand Down Expand Up @@ -311,7 +313,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error {
if r.cord == nil {
// either failed to take leadership or lost leadership
delete(m.policies, r.id)

m.muPoliciesCanceller.Lock()
delete(m.policiesCanceller, r.id)
m.muPoliciesCanceller.Unlock()
} else {
m.policies[r.id] = r
}
Expand Down Expand Up @@ -396,6 +401,9 @@ func (m *monitorT) getIPs() ([]string, error) {
}

func (m *monitorT) rescheduleUnenroller(ctx context.Context, pt *policyT, p *model.Policy) {
m.muPoliciesCanceller.Lock()
defer m.muPoliciesCanceller.Unlock()

u := uuid.Must(uuid.NewV4())
l := m.log.With().Str(dl.FieldPolicyID, pt.id).Str("unenroller_uuid", u.String()).Logger()
unenrollTimeout := time.Duration(p.UnenrollTimeout) * time.Second
Expand All @@ -418,6 +426,13 @@ func (m *monitorT) rescheduleUnenroller(ctx context.Context, pt *policyT, p *mod
}
}

func (m *monitorT) ActivePoliciesCancellerCount() int {
m.muPoliciesCanceller.Lock()
defer m.muPoliciesCanceller.Unlock()

return len(m.policiesCanceller)
}

func runCoordinator(ctx context.Context, cord Coordinator, l zerolog.Logger, d time.Duration) {
cnt := 0
for {
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/coordinator/monitor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestMonitorUnenroller(t *testing.T) {
assert.NotEmpty(t, agent.UnenrolledAt)
assert.Equal(t, unenrolledReasonTimeout, agent.UnenrolledReason)
assert.Len(t, pm.(*monitorT).policies, 1)
assert.Len(t, pm.(*monitorT).policiesCanceller, 1)
assert.Equal(t, pm.(*monitorT).ActivePoliciesCancellerCount(), 1)

// should error as they are now invalidated
_, err = bulker.APIKeyAuth(bulkCtx, *accessKey)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestMonitorUnenrollerSetAndClear(t *testing.T) {
assert.True(t, agent.Active)
// Make sure canceller is no longer there
assert.Len(t, pm.(*monitorT).policies, 1)
assert.Len(t, pm.(*monitorT).policiesCanceller, 0)
assert.Equal(t, pm.(*monitorT).ActivePoliciesCancellerCount(), 0)

}

Expand Down

0 comments on commit 181295d

Please sign in to comment.