Skip to content

Commit

Permalink
Fix race condition in test
Browse files Browse the repository at this point in the history
  • Loading branch information
madhuravi committed Nov 27, 2017
1 parent 83cb406 commit a6e85b5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
19 changes: 11 additions & 8 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,10 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
s.matchingEngine.config.RangeSize = rangeSize // override to low number for the test

dispatchTTL := time.Nanosecond
dPtr := _maxDispatchDefault
mgr := newTaskListManagerWithRateLimiter(
s.matchingEngine, tlID, s.matchingEngine.config,
newRateLimiter(&_maxDispatchDefault, dispatchTTL),
newRateLimiter(&dPtr, dispatchTTL),
)
s.matchingEngine.updateTaskList(tlID, mgr)
s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID
Expand Down Expand Up @@ -498,15 +499,14 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
}, nil)

zeroDispatchCt := 0
var maxDispatch float64
for i := int64(0); i < taskCount; i++ {
scheduleID := i * 3

var wg sync.WaitGroup

var result *workflow.PollForActivityTaskResponse
var pollErr error
maxDispatch = float64(i)
maxDispatch := float64(i)
if i%2 == 0 {
maxDispatch = 0
}
Expand Down Expand Up @@ -619,9 +619,10 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
tlID := &taskListID{domainID: domainID, taskListName: tl, taskType: persistence.TaskListTypeActivity}
dispatchTTL := time.Nanosecond
s.matchingEngine.config.RangeSize = rangeSize // override to low number for the test
dPtr := _maxDispatchDefault
mgr := newTaskListManagerWithRateLimiter(
s.matchingEngine, tlID, s.matchingEngine.config,
newRateLimiter(&_maxDispatchDefault, dispatchTTL),
newRateLimiter(&dPtr, dispatchTTL),
)
s.matchingEngine.updateTaskList(tlID, mgr)
s.taskManager.getTaskListManager(tlID).rangeID = initialRangeID
Expand Down Expand Up @@ -685,12 +686,12 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
Identity: &identity,
})}
}, nil)
var maxDispatch float64
var throttleMu sync.Mutex
throttleCt := 0
for p := 0; p < workerCount; p++ {
go func() {
go func(wNum int) {
for i := int64(0); i < taskCount; {
maxDispatch = dispatchLimitFn(p, i)
maxDispatch := dispatchLimitFn(wNum, i)
result, err := s.matchingEngine.PollForActivityTask(s.callContext, &matching.PollForActivityTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollRequest: &workflow.PollForActivityTaskRequest{
Expand All @@ -701,7 +702,9 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
})
if err != nil {
s.Contains(err.Error(), "ServiceBusyError")
throttleMu.Lock()
throttleCt++
throttleMu.Unlock()
i++
continue
}
Expand Down Expand Up @@ -729,7 +732,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
i++
}
wg.Done()
}()
}(p)
}
wg.Wait()
totalTasks := int(taskCount) * workerCount
Expand Down
5 changes: 3 additions & 2 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
)

// NOTE: Is this good enough for stress tests?
var (
const (
_maxDispatchDefault = 100000.0
_dispatchLimitTTL = time.Second
)
Expand Down Expand Up @@ -117,8 +117,9 @@ func (rl *rateLimiter) Consume(count int, timeout time.Duration) bool {
func newTaskListManager(
e *matchingEngineImpl, taskList *taskListID, config *Config, maxDispatchPerSecond *float64,
) taskListManager {
dPtr := _maxDispatchDefault
if maxDispatchPerSecond == nil {
maxDispatchPerSecond = &_maxDispatchDefault
maxDispatchPerSecond = &dPtr
}
rl := newRateLimiter(maxDispatchPerSecond, _dispatchLimitTTL)
return newTaskListManagerWithRateLimiter(e, taskList, config, rl)
Expand Down

0 comments on commit a6e85b5

Please sign in to comment.