Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add taskList throttling to allow users to limit activities executed per second #432

Merged
merged 28 commits into from
Jan 5, 2018

Conversation

madhuravi
Copy link
Contributor

Fixes #50

@coveralls
Copy link

Coverage Status

Coverage increased (+0.08%) to 66.321% when pulling 24e626c on tasklist into cadabad on master.

func (rl *rateLimiter) shouldUpdate(maxDispatchPerSecond *float64) bool {
rl.RLock()
defer rl.RUnlock()
if maxDispatchPerSecond == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to put this check under the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

rl.ttl.Reset(rl.ttlRaw)
return true
default:
return *maxDispatchPerSecond < *rl.maxDispatchPerSecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is the only place that needs the lock.

if ok, _ := c.rateLimiter.TryConsume(1); !ok {
// Note: Is a better error counter more appropriate here? We might not want high sevs for this
c.metricsClient.IncCounter(scope, metrics.PollErrorsCounter)
return nil, createServiceBusyError("TaskList dispatch exceeded limit")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to block consumers on long poll and return empty polls to maintain the configured rate. We don't want them receive any busy errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fixed.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 66.351% when pulling e7a1fbd on tasklist into cadabad on master.

e.taskListsLock.RUnlock()
return result, nil
}
e.taskListsLock.RUnlock()
mgr := newTaskListManager(e, taskList, e.config)
e.taskListsLock.Lock()
mgr := newTaskListManager(e, taskList, e.config, maxDispatchPerSecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this after your RUnlock() on line 178

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we had some dup code here in checking the map. Cleaning up.

return newTaskListManagerWithRateLimiter(e, taskList, config, rl)
}

// Only for tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment accurate ? newTaskListManager uses this helper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is for use in tests, will clarify

@@ -323,6 +405,12 @@ func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, erro
}()
}

// Wait till long poll expiration for token. If token acquired, proceed.
if ok := c.rateLimiter.Consume(1, c.config.LongPollExpirationInterval); !ok {
// Note: Is a better error counter more appropriate here? We might not want high sevs for this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, emit a metric to indicate the error type i.e. being rate limited

String() string
}

func newTaskListManager(e *matchingEngineImpl, taskList *taskListID, config *Config) taskListManager {
type rateLimiter struct {
maxDispatchPerSecond *float64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common.TokenBucket doesn't support rates lower than one rps. So, there is no point in taking a float64. We likely want to support rates lower than one rps, so, consider switching to the golang/x/ratelimiter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Started out a float assumption but thinking about it a bit more, is there a real use case for wanting it?

maxDispatchPerSecond *float64
tokenBucket common.TokenBucket
ttl *time.Timer
sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by convention, anonymous members / mutex that protects whole struct go at the beginning of the struct

@@ -323,6 +405,12 @@ func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, erro
}()
}

// Wait till long poll expiration for token. If token acquired, proceed.
if ok := c.rateLimiter.Consume(1, c.config.LongPollExpirationInterval); !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the caller context deadline could technically be less than the LongPollExpirationInterval, this probably needs to be fixed later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixing.

}

func (rl *rateLimiter) Consume(count int, timeout time.Duration) bool {
rl.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't like the fact that we hold the lock while we wait on a caller provided timeout. i.e. if there are 20 go-routines, 19 blocked on Consume() and one blocked on UpdateMaxDispatch(), the latter can starve for a long time (and meanwhile, context deadline could even be exceeded). Here is an alternative - don't hold the lock within the Consume / TryConsumer methods, instead use atomic.Value to hold the token bucket reference. Whenever you change the token bucket, just atomically set the Value to the new token bucket. That way, callers who hold the old token bucket will stick to the old limit. Newer callers will use the new bucket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I would say let's not optimize for that yet and keep it simple, changes are usually rare. Definitely something we can revisit if we hit performance issues.

Copy link
Contributor

@venkat1109 venkat1109 Nov 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

madhu - this is more than just an optimization. Its a bit of correctness as well, in that, we can block the caller for way more than the deadline on a lock contention. With atomic.Value, you will end up with exactly the same lines of code, so IMO, let's just do it right the first time around and forget about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And fyi - its not about the "value changing" - UpdateMaxDispatch is called regardless for every Poll() request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, it will not affect correctness. Remember that all Poll's will converge on the same throttling value. If one Poll times out, the next one will update it. Also it's not about lines of code, locks are far more common and easier to understand and don't involve type casts. Good read https://texlution.com/post/golang-lock-free-values-with-atomic-value/

Also yes UpdateMaxDispatch is called for every Poll, but write locks are only acquired if the value is changing. Otherwise, only read locks are acquired. Check the shouldUpdate function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that locks are more more common / standard pattern, but atomic.Values aren't that complicated either. In this case, its clearly the right fit IMO. As for correctness, I understand polls will converge, but you will block the caller for more than the caller provided timeout (deadline). Anyways, will leave it up to you.

Also yes UpdateMaxDispatch is called for every Poll, but write locks are only acquired if the value is changing
yup, you are right.

type rateLimiter struct {
maxDispatchPerSecond *float64
tokenBucket common.TokenBucket
ttl *time.Timer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider just using epoch time to check for ttl expiration instead of using a timer i.e. just store expiration as now().nanos+ttl and within your UpdateMaxDispatch just check if time.Now() > expiration.

Timers use the global heap within the go runtime (lock protected afaik) i.e. more expensive than time.Now()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it simple here, not sure the perf is significant enough to make it more complicated. Also timer uses channels internally, no lock as far as I can see https://golang.org/src/time/sleep.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

madhu - storing the expiry as epoch is a well understood pattern on the server side. And given that this is just couple of lines of code change and it buys us efficiency, why wait until there is a perf issue, lets just do it and be done with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timer is the recommended pattern and simple/easy to understand. What efficiency does epoch buy us? What is the perf advantage here exactly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommended ? see this golang/go#15133

@@ -42,20 +42,95 @@ const (
done time.Duration = -1
)

// NOTE: Is this good enough for stress tests?
const (
_maxDispatchDefault = 100000.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ not a blocker ] consider renaming this to _taskListOutboundRPS for clarity

// NOTE: Is this good enough for stress tests?
const (
_maxDispatchDefault = 100000.0
_dispatchLimitTTL = time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ not a blocker ] consider renaming this to _taskListRateLimiterTTL

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 66.363% when pulling 85fac83 on tasklist into cadabad on master.

@@ -625,6 +626,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
LeaseFailureCounter: {metricName: "lease.failures"},
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
ThrottleErrorCounter: {metricName: "throttle.errors"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't treat these like errors. What about throttle.empty-polls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty polls sounds misleading. It's useful to differentiate between truly empty where no error is returned versus throttling limit is reached. I think it's still an error, but one we might not want in the same monitoring classification. On server side, this should not cause any tickets but users should be able to set alerts on these. There could be potential issues if there are spikes in throttling errors.

c.metricsClient.IncCounter(scope, metrics.ThrottleErrorCounter)
msg := fmt.Sprintf("TaskList dispatch exceeded limit: %s", err.Error())
// TODO: It's the client that's busy, not the server. Doesn't fit into any other error.
return nil, createServiceBusyError(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to return service busy error. Also it is going to be returned even if there is no backlog as we throttle before trying to match. From the poller point of view it is ErrNoTasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, want to differentiate between truly no tasks vs tasks being throttled. Either way I need a new thrift type so user can react to it. ErrNewTasks is not a type, it just has a specific string message.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 66.406% when pulling c93e34c on tasklist into cadabad on master.

e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
result.UpdateMaxDispatch(maxDispatchPerSecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if aren't gonna switch to atomics, consider moving this outside the lock so that, we don't block addition of new task lists, when another tasklist is blocked on UpdateMaxDispatch for multiple seconds if/when they roll out a change in poll limit

@@ -323,6 +409,16 @@ func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, erro
}()
}

// Wait till long poll expiration for token. If token acquired, proceed.
stopWatch := c.metricsClient.StartTimer(scope, metrics.PollThrottleLatency)
err := c.rateLimiter.Wait(ctx, c.config.LongPollExpirationInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just realized that this approach is not going to work for the following scenario:

  1. Rate limit of 10 polls per second
  2. No tasks are scheduled for a minute.
  3. Rate limiter will let 600 pollers to go through.
  4. Tasks are scheduled with 100 per second.
  5. For 6 seconds polls will return 100 tasks per second.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 66.871% when pulling 2dff6f6 on tasklist into 26303d5 on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.3%) to 66.876% when pulling 2dff6f6 on tasklist into 26303d5 on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.4%) to 67.059% when pulling 623475c on tasklist into 26303d5 on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.4%) to 67.001% when pulling 744025f on tasklist into 26303d5 on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 66.999% when pulling 46f4c3e on tasklist into 36c6f54 on master.

e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
result.UpdateMaxDispatch(maxDispatchPerSecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm little worried we are switching to a write lock on this hot path. Can we figure out a way which does not require acquiring the write lock on the top level map for TaskListMgr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one way with
RLock
Check map,
if ok,
runlock, update max dispatch
if not ok,
RUnlock
WLock
Again check map
if still not ok, then lock and write

So we will have some duplicate code, we will be checking if it is in the map twice. That way, if it is in map originally, it will onyl write lock. Makes code uglier but avoids write lock in majority of the cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have that logic in GetTaskList. I'm not sure if this new method is needed. If you move the logic of setting the throttling limit to GetTask than we can simplify this code a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed Update method but FYI most of this code is still needed for task list creation.

@@ -341,10 +351,14 @@ pollLoop:
}

taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
var maxDispatch *float64
if req.PollRequest.TaskListMetadata != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use "request.TaskListMetadata"

@@ -341,10 +351,14 @@ pollLoop:
}

taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
var maxDispatch *float64
if req.PollRequest.TaskListMetadata != nil {
maxDispatch = req.PollRequest.TaskListMetadata.MaxTasksPerSecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use "request.TaskListMetadata"

type taskListManager interface {
Start() error
Stop()
AddTask(execution *s.WorkflowExecution, taskInfo *persistence.TaskInfo) error
GetTaskContext(ctx context.Context) (*taskContext, error)
SyncMatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error
CancelPoller(pollerID string)
UpdateMaxDispatch(maxDispatchPerSecond *float64)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate API for this? Can you pass in the additional parameter to GetTaskContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task list is constructed before getTaskContext. The max dispatch is a property of the task list, makes more sense to add it as part of the task list fetch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if !rsv.OK() {
scope := metrics.MatchingTaskListMgrScope
c.metricsClient.IncCounter(scope, metrics.AddThrottleCounter)
return nil, fmt.Errorf("cannot add to tasklist, limit exceeded")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define a specific error for this? We also need to make sure we don't return back an error to the caller when this happens and instead store it to database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting returning nil here and not an error at all? Or returning a specific error type that is handled in the handler code. For the second option, we would add a new error type in thrift, and Max was a bit determined that this should not be considered an error which is why I also named the metric a counter instead of an error. Some history from previous thread with Max when I had used a ServiceBusyError classification for this:

mfateev on Nov 28, 2017 Member
I don't think we want to return service busy error. Also it is going to be returned even if there is no backlog as we throttle before trying to match. From the poller point of view it is ErrNoTasks.

  @madhuravi

madhuravi on Nov 28, 2017 Member
Same as above, want to differentiate between truly no tasks vs tasks being throttled. Either way I need a new thrift type so user can react to it. ErrNewTasks is not a type, it just has a specific string message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a change for this. Let me know what you think.

c.logger.Warn("Tasklist manager context is cancelled, shutting down")
break deliverBufferTasksLoop
}
c.logger.Warn("Unable to send tasks for poll, limit exceeded")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here. I thought Wait will block indefinitely until new tokens are available. What other error will cause Wait to unblock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should really only be that error. I'm simply handling this scenario so as to not let it fall through.

func (c *taskListManagerImpl) getTasksPump() {
defer close(c.taskBuffer)
c.startWG.Wait()

go c.deliverBufferTasksForPoll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need another go routine per tasklist? We already have 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come? There are two goroutines I see, one for the task pump and one to move tasks to poll. So it is 2 with this.
They do need to happen in parallel, so can't bundle them into one goroutine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke offline.

return nil, errPumpClosed
case result := <-c.tasksForPoll:
if result.syncMatch {
c.metricsClient.IncCounter(scope, metrics.PollSuccessWithSyncCounter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove any counters which are not used anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is not used? We still want a metric for successful poll and whether it is due to sync match or not.

if !ok { // Task list getTasks pump is shutdown
break deliverBufferTasksLoop
}
c.tasksForPoll <- &getTaskResult{task: task}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should measure the perf impact of this change. Considering we are not serializing all tasks through a non-buffered channel. Can you try to add a bench test to cover this scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about preference for buffered tasks over sync tasks? We discussed that and came to the conclusion that even before this, we will still prefer buffer tasks over sync tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is going to prioritize sync match just because sync match uses multiple goroutines to call trySyncMatch while deliverBufferTasksToPoll is called from a single goroutine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync match is not in multiple goroutines. Do you mean there are several parallel calls to AddTask whereas only one goroutine per host adding the buffer tasks? That is true, and that behavior would be true before this code change as well.

break deliverBufferTasksLoop
}
c.tasksForPoll <- &getTaskResult{task: task}
case <-c.deliverBufferShutdownCh:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have 2 separate ways to shutdown this channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spoke offline, this is to satisfy existing tests. There is a comment as well.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.03%) to 66.484% when pulling 006f60c on tasklist into bb03ce5 on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.03%) to 66.484% when pulling 006f60c on tasklist into bb03ce5 on master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 66.619% when pulling e4debb3 on tasklist into aa29ebb on master.

@@ -50,6 +50,8 @@ const (
_defaultTaskDispatchRPSTTL = 60 * time.Second
)

var errAddTasklistThrottled = fmt.Errorf("cannot add to tasklist, limit exceeded")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use errors.New

Copy link
Contributor

@samarabbas samarabbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix one minor comment and then you can merge it into master.

@madhuravi madhuravi dismissed mfateev’s stale review January 5, 2018 18:10

Addressed/fixed the comments.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 66.608% when pulling 310dbdc on tasklist into aa29ebb on master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants