Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Research adding concurrency support to Task Manager #71441

Closed
mikecote opened this issue Jul 13, 2020 · 31 comments
Closed

Research adding concurrency support to Task Manager #71441

mikecote opened this issue Jul 13, 2020 · 31 comments
Assignees
Labels
(Deprecated) Feature:Reporting Use Reporting:Screenshot, Reporting:CSV, or Reporting:Framework instead Feature:Task Manager Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams)

Comments

@mikecote
Copy link
Contributor

Research approaches to satisfy #54916 in order to come up with level of effort and timeline.

@mikecote mikecote added Feature:Task Manager Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams) labels Jul 13, 2020
@elasticmachine
Copy link
Contributor

Pinging @elastic/kibana-alerting-services (Team:Alerting Services)

@joelgriffith
Copy link
Contributor

Just to follow up: having some kind of pre-check hook would also work, and give plugins a way to retry at a later time. This way you all don't have to implement the logic to limit concurrency, but each Task could offer a way to limit it via a hook.

If a hook is defined, then the retry behavior can be part of the function/hooks returned body:

// Pretty quick pre-hook example
preHook: () => {
  const anotherTaskIsRunning = elasticsearch.get('.reporting-schedule-lock');
  if (!anotherTaskIsRunning) {
    return { available: true };
  }
  return { available: false, retryIn: 30000 };
};

@pmuellr
Copy link
Member

pmuellr commented Jul 14, 2020

The hook idea is interesting, being more dynamic than say a fixed concurrency value. Presumably the hook would be passed a bunch of data already retrieved about currently running and queued tasks (no more i/o in here please! :-) ).

I think your example ^^^ (based on a side chat) is a hook run AFTER the task has been read from task manager. My comment in that conversation is that it would be optimal to avoid reading such task types at all, since we just read a task that we're not going to run (wasted task slot) and we have to write it back out. ETOOMUCHIO heh. But ... we don't know just yet.

@joelgriffith
Copy link
Contributor

is a hook run AFTER the task has been read from task manager

I'd assume so, as sort of a pre-check before the task actually begins execution. Apologies on that, I'm not intimately aware about how the internals work, or even how you serialize/store the tasks themselves and all the sandboxing/processing of them to make sure they're "safe".

Anyways, I don't mean to expand the surface area of what we're trying to do here, just thought I'd offer an option that might be cheaper/better/faster to implement. If concurrency is more straightforward, then by all means lets go with that.

@tsullivan
Copy link
Member

tsullivan commented Jul 14, 2020

I still like the hook idea. I think it would help to have an architecture where each registered task is responsible for determining whether it is available at the time.

Could there be a way for task manager to call a hook from each registered task type BEFORE querying the task manager index for pending tasks? I understand there is an update-by-query that claims tasks. The query could be composed of multiple OR filters include only the "available" task types. Whether the task type is available should be controlled by the plugin that registers the task.

That idea helps solve the "Ability to set concurrency to 0" problem. If a hook says the task type is not available, then its concurrency is 0. To solve the ability to set concurrency to 1 or a different number, the information coming out of the hook will have to be able to set the search size for the max number of tasks that can be in the search results for a particular task type.

@gmmorris
Copy link
Contributor

gmmorris commented Jul 21, 2020

I've began looking into this and have a few ideas.

It's impossible for us to limit the number of tasks of a specific type returned by the update-by-query, but we can tell the query to only return specific types, so we could omit a type that we do not have capacity for.

My current thought process is that if TaskManager knows how many tasks of a certain type are running and what the max concurrency of that type is, we can do a few things:

  1. Omit the type from the update-by-query if it is already "at capacity" for that type. ⋆
  2. Once results are returned and pushed into the queue, we pull tasks off of that queue by taking tokens off of the semaphore so that we never have more of a certain type in play than we are allowed to run. ⋆⋆
  3. When a task finishes running, it credits the specific type's semaphore and that will allow TM to pull in the next item without needing to go back to ES. This will align well with: Eliminate the downtime between tasks completing and the next polling interval #65552

⋆ This would require checking whether the type can be run before each polling stage, which would be easier and faster with an internal semaphore per type than handing off to an async handler in the TaskTypeDefinition (which also opens us up to potential issues where one TaskType holds up the rest).

⋆⋆ If this is a local semaphore we can do smart things like query for more items than our max_workers if a certain type is at capacity, which will prevent a build up of a certain type from clogging up the Task Manager. If we need to keep asking a TaskType then this is harder and, again, more open to potential blocking.

I like the idea of task types being responsible for telling TaskManager whether they can handle any more, but I'm not sure it's worth it the added complexity. If we allowed the type to manage this, then it would still have to give Task Manager a number of "available slots", rather than true/false (which would always mean 1 or 0), and I think we'd have to make it synchronous.

Additionally, my feeling is that If we maintain this semaphore in TaskManager we can provide better observability over TaskManager's queue and gain a better understanding of why it's doing what it's doing.
Adding this smart queue and semaphore makes it's queuing mechanism far more complicated and I'd feel more comfortable if we could keep it more internal (at least as a starting point so we can measure the impact of making it external).

@pmuellr
Copy link
Member

pmuellr commented Jul 21, 2020

Steps 1-3 ^^^ LGTM. Some clarity note on 3 - when task with type X finishes, I think we'd want to look for the next available task to run, independent of type, as long as there is capacity at that type. I don't think, eg, that when task with type X finishes we'd only look for the next task of type X. 🤔 ...

@gmmorris
Copy link
Contributor

Oh yeah, I just meant it would release that type in the semaphore, not that it would take the next item of the same type - just the next one in the queue. 👍

@gmmorris
Copy link
Contributor

A couple of concerns that this approach could introduce:
Lets say, hypothetically, we have the Reporting tasks configured to a maxConcurrency of 1 (so it can only run 1 reporting task), and the Kibana instance has a max workers of 30.

Concern: If a Kibana has no reporting tasks in its queue and its query returns 30 reporting tasks, the task queue could be clogged up by Reporting tasks and a backlog of Reports will accumulate.

Remediation: There are a couple of things we can do that will balance this behaviour out, I believe:

  1. When there are tasks of a certain type in the queue those will count against the remaining values in the Semaphore (in this case 0 in the Semaphore and 29 in the queue) and would omit those types from subsequent queries, so over time these will get worked through and no more Reporting tasks will get pulled in until the queued tasks are completed.
  2. Within a few minutes the timeout on the task claiming in ES will pass and other Kibana instances will be able to pick up those tasks, claiming them as their own. When the initial Kibana tries to pick these up and mark them as running a version conflict will occur and that task will be thrown out and the next item in the queue will be picked up.
  3. My main concern is a backlog where the query doesn't claim new items because of a backlog of tasks that can't be picked up due to concurrency concerns. To address this we can query more items that we have max_workers for (a concept we already plan on introducing Eliminate the downtime between tasks completing and the next polling interval #65552) and specifically, we can count how many tasks we have blocked over concurrency concerns and count those not as the number of tasks but rather, as the permitted concurrency (in Reporting's case = 1) and that way a backlog of 29 Reporting tasks, will only ever count as 1 worker when calculating the availableWorkers which feeds into the query.

@tsullivan
Copy link
Member

tsullivan commented Jul 31, 2020

Hi @gmmorris I want to go back to:

It's impossible for us to limit the number of tasks of a specific type returned by the update-by-query,

I did some research into Elasticsearch queries to see if there's a way to filter or specify a range for the count of documents in a leaf of a compound query. That's another way of saying what I attempted at above with my comment with "set the search size for the max number of tasks that can be in the search results for a particular task type.". I've looked quite a bit and have not found a solution, so I'm continuing to reach out.

If Task Manager could access an ES feature in the update_by_query, I really think that's the ideal way to go.

Second, I have a big question about:

  1. Once results are returned and pushed into the queue, we pull tasks off of that queue by taking tokens off of the semaphore so that we never have more of a certain type in play than we are allowed to run. ⋆⋆

Correct me if I'm wrong, but since the results come from update_by_query, they've already been modified in ES. The update has a script to modify the task status to claiming for each hit in the result. That would be avoided if we could limit the count of each task type in the query itself.

@tsullivan
Copy link
Member

tsullivan commented Jul 31, 2020

@gmmorris @pmuellr what if task types that require nonconcurrency poll for work in a separate loop? Each one of these task types would need their own loop, probably. In the queries that poll for nonconcurrent work, we could have an overall size limit of returned docs set to 1.

@pmuellr
Copy link
Member

pmuellr commented Aug 3, 2020

Ya, I had been thinking of something similar, but more elaborate. Somehow arrange to run multiple task managers, each managing their own set of workers, whose concurrency requirements are similar / the same. We probably don't need to get so elaborate as to actually have multiple task managers, but somehow partitioning the tasks in some way. One potential downside to this is increased ES i/o, which we are already sensitive to.

@tsullivan
Copy link
Member

somehow partitioning the tasks in some way.

I hope we can explore this as the solution. I think there are only 2 partitions needed: unlimited concurrency, and limited or no concurrency.

Each partition has its own query. In unlimited concurrency, there is no size limits set on the search - the task types take whatever gets fired out. In the limited concurrency, we add a specific size param on the search that reflects the capability of the task types in the limited partition.

It seems like there is an upside to this idea, which is that the different partitions couldn't clog each other.

One potential downside to this is increased ES i/o, which we are already sensitive to.

That is true and could merit this idea as being temporary one. To remove that downside, we would need some enhancement in ES. It could be really great if ES _msearch supported update-by-query. We could carry on the partitions idea, but have all the queries for each partition be in a single request. If we think that each task type should be a "partition" in the msearch, that would solve the potential problem where task types can clog each other.

@polyfractal
Copy link
Contributor

Popping into this thread since @tsullivan and I were chatting in slack about it.

An "msearch-enabled update-by-query" would effectively be the same internally as multiple UBQ requests. E.g. it'd be doing the same work, just wrapped up in a single client call instead of multiple client calls. So that is likely a lower-hanging fruit if you wanted to investigate that route.

I'm not sure if there will be motivation to enhance UBQ in ES. Would have to check with the search team.. generally UBQ and DBQ are not well-liked by the ES team since they have some pointy edge-cases (doesn't handle failed shards well for example)

A more robust method would be getting off UBQ entirely and doing the process yourself: search request (or msearch) to fetch document IDs, then issue a bulk update with the combined set. UBQ is just syntactic sugar for that exact process, so you aren't losing any functionality, and gain considerable flexibility in deciding how documents are queried. It also allows you to handled failure cases better if you want, whereas UBQ would just quit and you're stuck in some kind of weird state.

More work for the client obviously, but UBQ is a relatively simple tool so I wouldn't expect much time/effort is put into making it more sophisticated. I'm not the search team though, so they may have different opinions... wouldn't hurt to ping them and see. :)

@gmmorris
Copy link
Contributor

gmmorris commented Aug 5, 2020

Hi @gmmorris I want to go back to:

It's impossible for us to limit the number of tasks of a specific type returned by the update-by-query,

I did some research into Elasticsearch queries to see if there's a way to filter or specify a range for the count of documents in a leaf of a compound query. That's another way of saying what I attempted at above with my comment with "set the search size for the max number of tasks that can be in the search results for a particular task type.". I've looked quite a bit and have not found a solution, so I'm continuing to reach out.

Thanks for doing that... I too, haven't found a solution to this.

If Task Manager could access an ES feature in the update_by_query, I really think that's the ideal way to go.

Sorry, I don't follow this... What do you mean by access an ES feature? 🤔

Correct me if I'm wrong, but since the results come from update_by_query, they've already been modified in ES. The update has a script to modify the task status to claiming for each hit in the result. That would be avoided if we could limit the count of each task type in the query itself.

Correct, but I can't figure out a way to do that in the UpdateByQuery... any idea how this could be achieved?

@gmmorris
Copy link
Contributor

gmmorris commented Aug 5, 2020

@gmmorris @pmuellr what if task types that require nonconcurrency poll for work in a separate loop? Each one of these task types would need their own loop, probably. In the queries that poll for nonconcurrent work, we could have an overall size limit of returned docs set to 1.

Ya, I had been thinking of something similar, but more elaborate. Somehow arrange to run multiple task managers, each managing their own set of workers, whose concurrency requirements are similar / the same. We probably don't need to get so elaborate as to actually have multiple task managers, but somehow partitioning the tasks in some way. One potential downside to this is increased ES i/o, which we are already sensitive to.

Yup, I have investigated this too, but it adds a lot of complexity to the timing in Task Manager and we already have a hard time in terms of observability into this and I'm seriously worried about how we can support this without first addressing the lack of observability into TM. ( I suspect @kobelb might have thoughts on this too)

Balancing between these different cycles and ensuring that we understand what's happening in there is going to be difficult once you have multiple different concurrency configurations.
I'm not saying it can't be done, but I'm concerned it'll actually be harder for us to maintain and support than a single stream and if we do decide to follow this route - I'd insist we first address that lack of observability, otherwise I don't believe we can effectively support our users and SDH tickets will likely end up forcing us to add this under pressure.

One of the problems I encountered when experimenting with this was that the tasks without concurrency limitations would "starve" the ones with concurrency limitations once there were many tasks in TM.
So you need to start doing things like - query for the tasks with limited concurrency, then the ones with less limited concurrency etc. until you get tot he ones with no limits, and that actually did the opposite - we're picking up a task that expired 1ms ago instead of the one that expired 5 seconds ago, because it had more concurrency limitations.
With reporting which is long running the latter is less of a problem but if the low concurrency tasks are short running, and there are lots of them, they cause havoc in the scheduling.
I'm not saying this is a blocker - but it breaks alot of the TM semantics and without proper observability we'd never be able to answer questions such as why are a bunch of alerts running at a delay?. 😕

@gmmorris
Copy link
Contributor

gmmorris commented Aug 5, 2020

A more robust method would be getting off UBQ entirely and doing the process yourself: search request (or msearch) to fetch document IDs, then issue a bulk update with the combined set. UBQ is just syntactic sugar for that exact process, so you aren't losing any functionality, and gain considerable flexibility in deciding how documents are queried. It also allows you to handled failure cases better if you want, whereas UBQ would just quit and you're stuck in some kind of weird state.

Thanks @polyfractal I really appreciate you weighing in.
We actually used to do this before introducing UPQ and it introduced an upper limit to our ability to horizontally scale Kibana. The reason was that we would encounter a lot more cases where Task Manager across multiple Kibana would claim the same tasks and clash.
This is hard to address in Kibana as we don't cluster Kibana instances (they don't know about each other) so there's no easy way to coordinate them.

The key difference for us has been the difference in time between the query and the update between doing it in the client or doing it in the UBQ.
That said - I think we could investigate going back to that approach and helping Kibana tackle this better... and as I keep hearing more and more concerns from the ES team itself about our use of UBQ, I am starting to consider it might be worth investigating further how we can move away from it.

@tsullivan
Copy link
Member

tsullivan commented Aug 5, 2020

If Task Manager could access an ES feature in the update_by_query, I really think that's the ideal way to go.

Sorry, I don't follow this... What do you mean by access an ES feature? 🤔

To say more plainly, this point of view is about requesting a new feature that can be added to Elasticsearch for benefiting Task Manager.

we would encounter a lot more cases where Task Manager across multiple Kibana would claim the same tasks and clash.

I get why using UBQ alleviates timing problems of version conflict across multiple instances trying to claim tasks with high throughput. The query and update in a single client call seems more "transactional" because the timing works out better. Based on what @polyfractal said about UBQ being semantic sugar though, I'm trying to get away from thinking of UBQ as a transaction. Maybe (I truly don't know) with UBQ still could 2 instances of Kibana attempting to update the same task documents (aka "claim the task") and get a version conflict error.

The version conflict problem should be recoverable: if a Kibana can't claim a task because another instance claimed it, then it should ignore it and let the other instance handle it. It is a problem when a subset of bulk documents fail to update and cause the overall set to fail. Maybe that's where work needs to go in: with that fixed, then the claiming cycle doesn't need to be "transaction-like."

@gmmorris
Copy link
Contributor

gmmorris commented Aug 5, 2020

To say more plainly, this point of view is about identifying a new feature that can be added to Elasticsearch for benefiting Task Manager.

Ah, yeah, haha, I'm trying to work with what we have, but obviously we'd love it if ES had an idea on their end that could help us.

Maybe (I truly don't know) with UBQ still could 2 instances of Kibana attempting to update the same task documents (aka "claim the task") and get a version conflict error.

Yeah, it still happens, but not nearly as often. Which is why switched to it.
I've explored ideas such as having one kibana divvy up work for others, but this is only on the drafting board for now... to be honest, capacity wise, I don't think we could go down that road, there's too much other stuff the Alerting team is under pressure to deliver.

The version conflict problem should be recoverable: if a Kibana can't claim a task because another instance claimed it, then it should ignore it and let the other instance handle it. It is a problem when a subset of bulk documents fail to update and cause the overall set to fail.

Sorry, I wasn't clear - it is recoverable, it was one of the first things we addressed in TM, but it still hurt performance as we ended up with lots of wasted cycles that made it hard to scale horizontally.

I'm happy to jump on a call and run you through everything we've done in TM over the past year, I think you'll find we've addressed all of the low hanging fruit and we're now trying to figure out to fit in new functionality within the limitations that we're forced to work with.

@gmmorris
Copy link
Contributor

gmmorris commented Aug 5, 2020

I've dragged this back into In Progress - I'm going to take another look at how complicated it would be to have a separate poller for a task types with limited concurrency.
It would mean ring fencing a certain amount of workers for these tasks, which was something I wanted to avoid, but I'd liek to play around with it abit and see what we can learn.

@polyfractal
Copy link
Contributor

re: version conflicts

Yeah correct, UBQ still technically can run into the same version conflict issues as regular query-then-update... it just tends to happen less because the round-trip time is smaller/local. But that said, if there is sufficient contention in general, this reduced latency is only a bandaid and will run into scaling issues eventually too. E.g. it just kicks the can down the road if there are fundamental contention issues, which need to be solved in a different manner (multiple work-stealing queues or something).

To say more plainly, this point of view is about requesting a new feature that can be added to Elasticsearch for benefiting Task Manager.

++ Yeah just wanted to mention that I agree, if there's an opportunity here to build something that really solves the issue we shouldn't ignore that. I was just popping in to make a note about UBQ in particular. So don't view my comments as discouraging exploring options, including specialized functionality (or modifying existing functionality) to make things work better.

/cc @jpountz in case we need to grab someone from Search or Core to help look into this :)

@gmmorris
Copy link
Contributor

++ Yeah just wanted to mention that I agree, if there's an opportunity here to build something that really solves the issue we shouldn't ignore that. I was just popping in to make a note about UBQ in particular. So don't view my comments as discouraging exploring options, including specialized functionality (or modifying existing functionality) to make things work better.

Thanks Zachary 🙏

@gmmorris
Copy link
Contributor

gmmorris commented Aug 12, 2020

I've spiked an approach where we spawn a separate poller for the limited concurrency task type, and it seems to work pretty well.
There's a bunch of things I haven't delved into yet (beyond, you know, tests) such as stability, additional load on ES, runNow support etc. but it shows it could be a good solid approach.

PR is here: #74883
Would love to hear your thoughts

@pmuellr
Copy link
Member

pmuellr commented Aug 13, 2020

I think I like this approach :-) Just took a quick peek through the code though ...

It seems like a very clean way to separate these sorts of things out - from a higher level - rather than make the innards more complicated.

My main worry is the additional i/o on ES that this will cause. I think in the case of reporting, we could certainly change the polling interval to be longer, which would I think be the easiest way to cut down on the i/o.

@gmmorris
Copy link
Contributor

I think I like this approach :-)

So do I :) (thanks)

My main worry is the additional i/o on ES that this will cause. I think in the case of reporting, we could certainly change the polling interval to be longer, which would I think be the easiest way to cut down on the i/o.

Yes, agreed, it's my concern as well.
I was thinking we might need to introduce some kind of "type level config overrides" which would could perhaps be part of the type definition. But we'd need to figure out if it makes sense for a type def to override the kibana.yml config 🤷‍♂️ seems sketchy, but we might be able to find an elegant way of expressing it.

@gmmorris
Copy link
Contributor

@tsullivan @joelgriffith How do y'all feel about Reporting polling for work at a lower rate than the normal task polling?
You could always use runNow if you need to force an immediate run of the task, otherwise it would wait for the lower frequency poll, which would allow us to avoid adding too much stress to the cluster.

@tsullivan
Copy link
Member

@tsullivan @joelgriffith How do y'all feel about Reporting polling for work at a lower rate than the normal task polling?
You could always use runNow if you need to force an immediate run of the task, otherwise it would wait for the lower frequency poll, which would allow us to avoid adding too much stress to the cluster.

Currently our polling interval is 3s per server, and I think we should stick to that through the migration of our service.

I think that rate is slower than the normal task polling, so I think we're ok all around on this.

@kobelb
Copy link
Contributor

kobelb commented Oct 20, 2020

I think I like this approach :-) Just took a quick peek through the code though ...

It seems like a very clean way to separate these sorts of things out - from a higher level - rather than make the innards more complicated.

👍

My main worry is the additional i/o on ES that this will cause. I think in the case of reporting, we could certainly change the polling interval to be longer, which would I think be the easiest way to cut down on the i/o.

For Reporting, there is generally a lot of "idle time". Is a search more performant than an update_by_query? If so, perhaps we could do a simple search to see if there's anything to run prior to running the update_by_query?

@tsullivan tsullivan added (Deprecated) Feature:Reporting Use Reporting:Screenshot, Reporting:CSV, or Reporting:Framework instead Team:AppServices labels Dec 17, 2020
@elasticmachine
Copy link
Contributor

Pinging @elastic/kibana-app-services (Team:AppServices)

@gmmorris
Copy link
Contributor

For Reporting, there is generally a lot of "idle time". Is a search more performant than an update_by_query? If so, perhaps we could do a simple search to see if there's anything to run prior to running the update_by_query?

That's an interesting question....
Would doing so mean maintaining two different Task Manager implementations - one for reporting and another for the rest of the stack? 🤔
Off the top of my head I can't think of a way to avoid that, which means we'd have to find a way to make the polling lifecycle a pluggable behaviour. Definitely doable, but I'm not sure it's worth the effort and the long term costs of maintaining two different approaches to polling.

Worth investigating further though.

@gmmorris
Copy link
Contributor

This has now been delivered 🎉

@kobelb kobelb added the needs-team Issues missing a team label label Jan 31, 2022
@botelastic botelastic bot removed the needs-team Issues missing a team label label Jan 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
(Deprecated) Feature:Reporting Use Reporting:Screenshot, Reporting:CSV, or Reporting:Framework instead Feature:Task Manager Team:ResponseOps Label for the ResponseOps team (formerly the Cases and Alerting teams)
Projects
None yet
Development

No branches or pull requests

8 participants