-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable concurrent activation processing #2795
Conversation
Read briefly over the code, good for a start 👍 The changes in ContainerProxy look good to me on first glance. I wonder if we can simplify on the scheduling bits though: What if we tracked the number of concurrent invocation always and only update to Busy, if that number reaches In the non-concurrent case that number can be set to 1. I think we could get away without any changes to the scheduler in that case, since the ContainerPool itself keeps the state-tracking consistent. WDYT? |
@markusthoemmes I think you mean to keep containers in freePool, and only move to busyPool when reaching max? (I'm not sure that WorkerState is used anymore?) If we do that, then ContainerPool.remove needs to pay attention to the concurrent invocations, and there will be potentially a lot of shuffling between freePool and busyPool when traffic hovers around max. In the other case (where container moves to busy after 1), there is potentially a lot of shuffling when there is a stream of sequential events (1 at a time), so maybe this doesn't matter. I'll see if I can change the remove to respect the concurrent activation count and avoid complicating the scheduling as you describe. |
@tysonnorris, is this PR still valid? |
@dubeejw yes still valid, but will need to be updated to resolve conflicts; I'll get back to it asap. |
Assigning to @tysonnorris until out of WIP. |
@tysonnorris, looks like this one needs a rebase. |
How is it coming along? |
I have some other work in progress for HA support on mesos, then will get back to this ASAP. |
@markusthoemmes @style95 @upgle this is ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the approach taken here very much, as it falls back nicely to the default behavior if concurrency is configured to 1 (as opposed to have a large Boolean switch which switches implementations).
Let's clarify on the state-keeping and then I think we're good 👍
|
||
# create a noop action | ||
echo "Creating action $action" | ||
curl -k -u "$credentials" "$host/api/v1/namespaces/_/actions/$action" -XPUT -d '{"namespace":"_","name":"test","exec":{"kind":"nodejs:default","code":"function main(){ return new Promise(function(resolve, reject) { setTimeout(function() { resolve({ done: true }); }, 175); }) }"}}' -H "Content-Type: application/json" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rather parametrize the existing create.sh
on the action-code and cat
that from a file?
"$host/api/v1/namespaces/_/actions/$action?blocking=true" \ | ||
--latency \ | ||
--timeout 10s \ | ||
--script post.lua |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, shall we parametrize on the action-file name so we can reuse one script?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean for post.lua
? does this need to be parameterized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you meant to parameterize throughput.sh script... will do that
@@ -107,6 +106,7 @@ class ContainerProxy( | |||
implicit val ec = context.system.dispatcher | |||
implicit val logging = new AkkaLogging(context.system.log) | |||
var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself | |||
var activeActivationCount = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about making this part of WarmedData
or even ContainerData
? That would eschew the need for the ActivationCounter
and make other parts easier as well (I'll ad comments for posterity)
val freeContainers = pool.collect { | ||
case (ref, w: WarmedData) => ref -> w | ||
case (ref, w: WarmedData) if !checkConcurrent || ActivationCounter.getOrElse(ref, 0) == 0 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This becomes
case (ref, w: WarmedData) if w.activationCount == 0 =>
And you can remove the boolean flag if we can add the counter to WarmedData
.
2d7d0fe
to
18385f2
Compare
Tyson do we need to guard the annotation to apply only to runtimes their can support concurrent activations? Will this need an attribute in the runtime manifest? |
This is currently a system-level switch, which will enable concurrent activation handling everywhere. I'll create a separate PR for enabling it selectively, per action. Ideally, yes, it would only be allowed for action images that support it - this could be done (not sure it is required, but would certainly be convenient) in runtime manifest for kinds, and I guess we can skip this assertion for blackbox images (presumably the developer will only enable it on a blackbox image he knows supports it?). |
@tysonnorris we have user extending the nodejs as blackbok docker actions because they have node_modukes larger than 50MB, so this user might want to use concurrency. There might kinds that do not support concurrency. |
+1 to have runtimemanifest attribute |
For this incremental PR, operator who enables concurrency is responsible for:
I would address the additional kind validations, action annotations, etc in separate PRs |
Having the predicate visible in the runtime manifest would be convenient for clients (wsk cli) to validate and fail early if an annotation is added to an unsupported runtime. Black box I think is exempted, user beware. |
Codecov Report
@@ Coverage Diff @@
## master #2795 +/- ##
==========================================
- Coverage 84.57% 81.24% -3.34%
==========================================
Files 148 151 +3
Lines 7110 7255 +145
Branches 431 466 +35
==========================================
- Hits 6013 5894 -119
- Misses 1097 1361 +264
Continue to review full report at Codecov.
|
class ShardingContainerPoolBalancer( | ||
config: WhiskConfig, | ||
controllerInstance: ControllerInstanceId, | ||
private val feedFactory: (ActorRefFactory, MessagingProvider, (Array[Byte]) => Future[Unit]) => ActorRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tysonnorris May be we can use traits here instead of function arguments. It makes it easy in IDE to see what is the implementation and move back and forth easily
case (_, WarmedData(_, `invocationNamespace`, `action`, _)) => true | ||
case _ => false | ||
case c @ (_, WarmedData(_, `invocationNamespace`, `action`, _, _)) | ||
if c._2.activeActivationCount < action.limits.concurrency.maxConcurrent => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified a bit
case (_, WarmedData(_, `invocationNamespace`, `action`, _, activeActivationCount))
if activeActivationCount < action.limits.concurrency.maxConcurrent =>
case Some(((actor, data), containerState)) => | ||
busyPool = busyPool + (actor -> data) | ||
freePool = freePool - actor | ||
case Some(((actor, data), containerState, activeActivations)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think activeActivationCount
in data
which is ContainerData
== activeActivations
. In that case we can avoid extracting activeActivationCount
in steps before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will try to remove that 👍
@cbickel Do you think you'll have time soon to review this? |
9c94647
to
bc3845b
Compare
@chetanmeh any other comments on this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not through the whole PR yet, but here are the first few comments.
.travis.yml
Outdated
@@ -75,11 +75,17 @@ jobs: | |||
name: "System Tests" | |||
- script: | |||
- ./tests/performance/preparation/deploy.sh | |||
- TERM=dumb ./tests/performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" 2m | |||
- TERM=dumb ./tests/performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" 4 2 2m | |||
- TERM=dumb ./tests/performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./performance/preparation/actions/noop.js 2m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woukd it make sense to use the action tests/performance/preparation/actions/noop.js
here? And delete the file performance/preparation/actions/noop.js
?
That applies to async.js
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, these are left over from when the performance directory was moved, long ago...
@@ -0,0 +1,12 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one or more contributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this file been created by accident? Or where is it used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted the dupe
@@ -0,0 +1,5 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one or more contributor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This action is duplicate as well.
tools/travis/setupSystem.sh
Outdated
@@ -25,7 +25,7 @@ ROOTDIR="$SCRIPTDIR/../.." | |||
|
|||
cd $ROOTDIR/ansible | |||
|
|||
$ANSIBLE_CMD openwhisk.yml | |||
$ANSIBLE_CMD -e container_pool_akka_client=true -e runtimes_enable_concurrency=true openwhisk.yml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please move these paramters to the environment. Otherwise it could be veryconfusing on debugging, which parameters were used in travis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to define vars in runSystemTests.sh
? or in travis settings? I'm not sure moving them to the other scripts is less confusing, but I can do that; not sure we should define them in travis settings though.
BTW there are similar changes to ./tests/performance/preparation/deploy.sh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I mean moving it to the environment in ansible, that is used in travis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, like this? ebf7a18
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more feedback focusing on ContainerProxy
@@ -29,7 +29,8 @@ case class ContainerArgsConfig(network: String, | |||
dnsServers: Seq[String] = Seq.empty, | |||
extraArgs: Map[String, Set[String]] = Map.empty) | |||
|
|||
case class ContainerPoolConfig(userMemory: ByteSize, akkaClient: Boolean) { | |||
case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor: Double, akkaClient: Boolean) { | |||
require(concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0, "concurrentPeekFactor must be > 0 and <= 1.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include the current value in assertion message
|
||
//number of peeked messages - increasing the concurrentPeekFactor improves concurrent usage, but adds risk for message loss in case of crash | ||
private val maxPeek = | ||
math.max(maximumContainers, (maximumContainers * limitsConfig.max * poolConfig.concurrentPeekFactor).toInt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In default setup with limitsConfig.max
= 500 , MemoryLimit.minMemory
of 128M, and peekFactor of 0.5 it becomes userMemory_in_MB * 2. So on a box with userMemory=say 20GB, maxPeek = 2010242 = 41000.
This would be a high number and if those activations are big then it can bring down invoker. May be we have a lower default value for limitsConfig.max
. And later look into having a more streaming way of reading activations from Kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't quite follow your math, is there some typos? In any case, this will vary based on load, but the defaults should be better.
I will change default max to 1 (concurrency disabled by default; only enabled by default in "local" env for test runs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try again
val maximumContainers = (poolConfig.userMemory / MemoryLimit.minMemory).toInt
Here
MemoryLimit.minMemory
= 128 MBpoolConfig.userMemory
= x MB
So maximumContainers
= x/128
Now
val maxPeek =
math.max(maximumContainers, (maximumContainers * limitsConfig.max * poolConfig.concurrentPeekFactor).toInt)
Where earlier the defauls were
limitsConfig.max
= 500poolConfig.concurrentPeekFactor
= 0.5
So
maxPeek = max(x/128, x/128 * 500 * 0.5)
maxPeek = max(x/128, ~ 2x)
So if poolConfig.userMemory
is 32 GB = 32768 MB then maxPeek
= 65536. Which is a high number. Considering a worst case of 1 MB per activation it results in loading 65 GB of data. So may be we introduce an explicit config for maxPeek
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to just define a different value for concurrentPeekFactor?
My point is that regardless of specifying directly a value for maxPeek, or indirectly a value based on existing concurrentPeekFactor, there is still going to be potential for exhausting the system. e.g. we could set it to 0.1 - if you think that specifying maxPeek=12800 is easier to understand than concurrentPeekFactor=0.1, we can do that. But in neither case does it directly translate to actual behavior in the system since the message peeking a) pays no attention to differences in actions or activations request/response memory impact and b) is required to be increased to allow concurrent activations to be processed. This is a known tradeoff of enabling concurrency for now.
@@ -147,7 +158,7 @@ class ContainerProxy( | |||
case Success(container) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also update the SclaDoc for ContainerProxy
if (!initInterval.isEmpty) { | ||
self ! WarmedData(container, job.msg.user.namespace.name, job.action, Instant.now, 1) | ||
} | ||
val parameters = job.msg.content getOrElse JsObject() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retain previous usage of JsObject.empty
} | ||
|
||
val activation: Future[WhiskActivation] = initialize | ||
.flatMap { initInterval => | ||
val parameters = job.msg.content getOrElse JsObject.empty | ||
//immediately setup warmedData for use (before first execution) so that concurrent actions can use it asap | ||
if (!initInterval.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use initInterval.isDefined
case class PreWarmedData(container: Container, | ||
kind: String, | ||
override val memoryLimit: ByteSize, | ||
override val activeActivationCount: Int = 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case would a PreWarmedData
would have activeActivationCount
!= 0? As PreWarmedData
is by definition state prior to any activation is run.
A state transition diagram would be handy here to understand the transitions better!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, will work on that;
PreWarmedData may be 1 when emitted during cold start, to signal to parent that it is prewarmed, but already has active activations in flight, so that pool can use it asap, but also respect the concurrency limit.
// Run was successful | ||
case Event(data: WarmedData, _) => | ||
// Init was successful | ||
case Event(data: WarmedData, _: PreWarmedData) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for myself - In Started
state following sequence happens
- proxy transitions to
Running
withPreWarmedData
(activeCount = 1). - It also then sends msg
WarmedData
(count = 1) ininitializeAndRun
. Assuming this is firstRun
being processed - It also then sends msg
WarmedData
with count = 0
So each call to initializeAndRun
end up sending 2 WarmedData
messages. One with count = 0 and one with count at time of start of message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for 3 it should send WarmedData with count 0 only to itself for completion notification, and to parent only in case where all active activations are completed. There is a confusing aspect here which is that WarmedData is used BOTH for signaling completion (active count is ignored) on internal state machine AND for signaling "NeedWork" to parent (active count is important).
stay using data | ||
|
||
// Run was successful | ||
case Event(_: WarmedData, s: WarmedData) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tysonnorris Here use of WarmedData
both as message and state is bit confusing. May be we use new event type
- For successful -
RunCompleted
- For init -
Init
Further we set the activeActivationCount
in PreWarmedData
to 0 and make it constant i.e. not provide a way to change it from 0 for PreWarmedData
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, just mentioned that above - I will take a stab at replacing it with RunCompleted.
@@ -346,7 +381,8 @@ object ContainerPool { | |||
def props(factory: ActorRefFactory => ActorRef, | |||
poolConfig: ContainerPoolConfig, | |||
feed: ActorRef, | |||
prewarmConfig: List[PrewarmingConfig] = List.empty) = | |||
prewarmConfig: List[PrewarmingConfig] = List.empty, | |||
maxConcurrent: Int = 1) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxConcurrent
not used ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops leftover from refactoring
@@ -197,11 +230,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, | |||
// Container got removed | |||
case ContainerRemoved => | |||
freePool = freePool - sender() | |||
busyPool.get(sender()).foreach { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change does not looks like required for concurrency support. So better to retain the existing approach of sending message only if busyPool
had sender
28f4cbc
to
809f572
Compare
@dubee any update on PG? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks generally good but there are some critical parts that make me nervous - would you revolt if I suggested that we break this PR up into at least 3?
- introduce the concurrency limit schema and config related changes
- if it's indeed separable (i think it is from my initial review), the feed modifications
- the load balancer and invoker related changes (maybe this is two parts)
@@ -72,8 +75,9 @@ protected[core] object ActionLimits extends ArgNormalizer[ActionLimits] with Def | |||
val time = TimeLimit.serdes.read(obj.get("timeout") getOrElse deserializationError("'timeout' is missing")) | |||
val memory = MemoryLimit.serdes.read(obj.get("memory") getOrElse deserializationError("'memory' is missing")) | |||
val logs = obj.get("logs") map { LogLimit.serdes.read(_) } getOrElse LogLimit() | |||
val concurrency = obj.get("concurrency") map { ConcurrencyLimit.serdes.read(_) } getOrElse ConcurrencyLimit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but note to self: check there is a test for successful serdes of previous schema.
|
||
ActionLimits(time, memory, logs) | ||
ActionLimits(time, memory, logs, concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there/will there be a downstream PR to add this to the CLI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed; starting with client-go: apache/openwhisk-client-go#94
094b01a
to
a3ba24d
Compare
d4828b4
to
789e7a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I scrutinized the critical parts of the PR in the invoker over several day, and the accompanying tests. I think the corner cases (and off by one errors) are covered. I'm a little concerned about landing all this in one big swoop but it's been in the works for a while and unreasonable to request restructuring this PR now. I think the behavior should be the same pre-this-PR if the feature is not enabled.
@tysonnorris thanks for being patient with this and working through all the nuances. Looking forward to feedback from production on this feature and what the user experience will be like from real codes. |
@tysonnorris Thanks a lot for this PR :) Great job. |
@tysonnorris Thank you for your efforts. |
Thanks a lot @tysonnorris, a huge piece of good work 🎉 . Also thanks to @rabbah, @cbickel and @chetanmeh for reviewing and @dubee for handling pull-request mechanics. Great team effort 🚀 |
Thanks @tysonnorris This is a great milestone to achieve !!! 🎉 |
Thanks for all the help from everyone - it is greatly appreciated! |
This patch adds intra-container concurrency for actions that run inside a runtime which supports concurrency (currently nodejs runtimes only). Actions must opt-into this behavior (by setting the appropriate limit on the action). There are some caveats wrt log collection and the user should be aware that the memory limit applies globally to all instances of the action running inside the container --- in other words, the memory limit is a hard cap irrespective of the max intra-container concurrency.
To validate operation, I've tested with the throughput.sh script using an action that has an async response after 175ms. In this type of scenario, I see throughput 2x over previous sequential processing (e.g. 10-15 rps before, 20-40 rps after).
Note that as discussed offline, in current form, with kafka between controller and invoker, there is no way to account for non-concurrent actions blocking resource usage by those that support concurrency, so currently the code assumes that all actions support concurrency, and the ContainerPoolScheduler accounts for concurrency settings per action, while MessageFeed only accounts for concurrency across the invoker in general (via maximumHandlerCapacity)