Skip to content

Commit

Permalink
optionally enable concurrency in action containers
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Nov 14, 2018
1 parent 33bb0e7 commit 789e7a3
Show file tree
Hide file tree
Showing 57 changed files with 2,041 additions and 238 deletions.
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,17 @@ jobs:
name: "Multi-Runtime Tests"
- script:
- ./tests/performance/preparation/deploy.sh
- TERM=dumb ./tests/performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" 2m
- TERM=dumb ./tests/performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" 4 2 2m
- TERM=dumb ./tests/performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./tests/performance/preparation/actions/noop.js 2m
- TERM=dumb ./tests/performance/wrk_tests/latency.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./tests/performance/preparation/actions/async.js 2m
- TERM=dumb ./tests/performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./tests/performance/preparation/actions/noop.js 4 1 2 2m
- TERM=dumb ./tests/performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./tests/performance/preparation/actions/async.js 4 1 2 2m
- TERM=dumb ./tests/performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./tests/performance/preparation/actions/noop.js 100 110 2 2m
- TERM=dumb ./tests/performance/wrk_tests/throughput.sh "https://172.17.0.1:10001" "$(cat ansible/files/auth.guest)" ./tests/performance/preparation/actions/async.js 100 110 2 2m
- OPENWHISK_HOST="172.17.0.1" CONNECTIONS="100" REQUESTS_PER_SEC="1" ./gradlew gatlingRun-org.apache.openwhisk.ApiV1Simulation
- OPENWHISK_HOST="172.17.0.1" MEAN_RESPONSE_TIME="1000" API_KEY="$(cat ansible/files/auth.guest)" EXCLUDED_KINDS="python:default,java:default,swift:default" PAUSE_BETWEEN_INVOKES="100" ./gradlew gatlingRun-org.apache.openwhisk.LatencySimulation
- OPENWHISK_HOST="172.17.0.1" API_KEY="$(cat ansible/files/auth.guest)" CONNECTIONS="100" REQUESTS_PER_SEC="1" ./gradlew gatlingRun-org.apache.openwhisk.BlockingInvokeOneActionSimulation
- OPENWHISK_HOST="172.17.0.1" API_KEY="$(cat ansible/files/auth.guest)" CONNECTIONS="100" REQUESTS_PER_SEC="1" ASYNC="true" ./gradlew gatlingRun-org.apache.openwhisk.BlockingInvokeOneActionSimulation
# The following configuration does not make much sense. But we do not have enough users. But it's good to verify, that the test is still working.
- OPENWHISK_HOST="172.17.0.1" USERS="1" REQUESTS_PER_SEC="1" ./gradlew gatlingRun-org.apache.openwhisk.ColdBlockingInvokeSimulation
- ./tools/travis/checkAndUploadLogs.sh perf
name: "Performance Tests"
4 changes: 4 additions & 0 deletions ansible/environments/local/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ kafka_topics_invoker_retentionBytes: 104857600
kafka_topics_invoker_retentionMS: 300000

env_hosts_dir: "{{ playbook_dir }}/environments/local"

container_pool_akka_client: true
runtimes_enable_concurrency: true
limit_action_concurrency_max: 500
4 changes: 4 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@
"CONFIG_whisk_timeLimit_max": "{{ limit_action_time_max | default() }}"
"CONFIG_whisk_timeLimit_std": "{{ limit_action_time_std | default() }}"

"CONFIG_whisk_concurrencyLimit_min": "{{ limit_action_concurrency_min | default() }}"
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"

"CONFIG_whisk_activation_payload_max":
"{{ limit_activation_payload | default() }}"

Expand Down
5 changes: 5 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,13 @@
"CONFIG_whisk_timeLimit_min": "{{ limit_action_time_min | default() }}"
"CONFIG_whisk_timeLimit_max": "{{ limit_action_time_max | default() }}"
"CONFIG_whisk_timeLimit_std": "{{ limit_action_time_std | default() }}"
"CONFIG_whisk_concurrencyLimit_min": "{{ limit_action_concurrency_min | default() }}"
"CONFIG_whisk_concurrencyLimit_max": "{{ limit_action_concurrency_max | default() }}"
"CONFIG_whisk_concurrencyLimit_std": "{{ limit_action_concurrency_std | default() }}"
"CONFIG_whisk_activation_payload_max": "{{ limit_activation_payload | default() }}"
"CONFIG_whisk_transactions_header": "{{ transactions.header }}"
"CONFIG_whisk_containerPool_akkaClient": "{{ container_pool_akka_client | default('false') }}"
"CONFIG_whisk_containerFactory_containerArgs_extraArgs_env_0": "__OW_ALLOW_CONCURRENT={{ runtimes_enable_concurrency | default('false') }}"
"CONFIG_whisk_invoker_protocol": "{{ invoker.protocol }}"
"CONFIG_whisk_invoker_https_keystorePath": "/conf/{{ invoker.ssl.keystore.name }}"
"CONFIG_whisk_invoker_https_keystorePassword": "{{ invoker.ssl.keystore.password }}"
Expand Down
10 changes: 10 additions & 0 deletions common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ whisk {
std = 10 m
}

# action concurrency-limit configuration
concurrency-limit {
min = 1
max = 1
std = 1
}

mesos {
master-url = "http://localhost:5050" //your mesos master
master-public-url = "http://localhost:5050" // if mesos-link-log-message == true, this link will be included with the static log message (may or may not be different from master-url)
Expand Down Expand Up @@ -286,3 +293,6 @@ whisk {
#}
}
}
#placeholder for test overrides so that tests can override defaults in application.conf (todo: move all defaults to reference.conf)
test {
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class ForcibleSemaphore(maxAllowed: Int) {
}
}

val sync = new Sync
private val sync = new Sync

/**
* Acquires the given numbers of permits.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.common

import scala.collection.concurrent.TrieMap

/**
* A Semaphore that coordinates the memory (ForcibleSemaphore) and concurrency (ResizableSemaphore) where
* - for invocations when maxConcurrent == 1, delegate to super
* - for invocations that cause acquire on memory slots, also acquire concurrency slots, and do it atomically
* @param memoryPermits
* @tparam T
*/
class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPermits) {
private val actionConcurrentSlotsMap = TrieMap.empty[T, ResizableSemaphore] //one key per action; resized per container

final def tryAcquireConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Boolean = {

if (maxConcurrent == 1) {
super.tryAcquire(memoryPermits)
} else {
tryOrForceAcquireConcurrent(actionid, maxConcurrent, memoryPermits, false)
}
}

/**
* Coordinated permit acquisition:
* - first try to acquire concurrency slot
* - then try to acquire lock for this action
* - within the lock:
* - try to acquire concurrency slot (double check)
* - try to acquire memory slot
* - if memory slot acquired, release concurrency slots
* - release the lock
* - if neither concurrency slot nor memory slot acquired, return false
* @param actionid
* @param maxConcurrent
* @param memoryPermits
* @param force
* @return
*/
private def tryOrForceAcquireConcurrent(actionid: T,
maxConcurrent: Int,
memoryPermits: Int,
force: Boolean): Boolean = {
val concurrentSlots = actionConcurrentSlotsMap
.getOrElseUpdate(actionid, new ResizableSemaphore(0, maxConcurrent))
if (concurrentSlots.tryAcquire(1)) {
true
} else {
// with synchronized:
concurrentSlots.synchronized {
if (concurrentSlots.tryAcquire(1)) {
true
} else if (force) {
super.forceAcquire(memoryPermits)
concurrentSlots.release(maxConcurrent - 1, false)
true
} else if (super.tryAcquire(memoryPermits)) {
concurrentSlots.release(maxConcurrent - 1, false)
true
} else {
false
}
}
}
}

def forceAcquireConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Unit = {
require(memoryPermits > 0, "cannot force acquire negative or no permits")
if (maxConcurrent == 1) {
super.forceAcquire(memoryPermits)
} else {
tryOrForceAcquireConcurrent(actionid, maxConcurrent, memoryPermits, true)
}
}

/**
* Releases the given amount of permits
*
* @param acquires the number of permits to release
*/
def releaseConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Unit = {
require(memoryPermits > 0, "cannot release negative or no permits")
if (maxConcurrent == 1) {
super.release(memoryPermits)
} else {
val concurrentSlots = actionConcurrentSlotsMap(actionid)
val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
//concurrent slots
if (memoryRelease) {
super.release(memoryPermits)
}
if (actionRelease) {
actionConcurrentSlotsMap.remove(actionid)
}
}
}
//for testing
def concurrentState = actionConcurrentSlotsMap.readOnlySnapshot()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.common

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import scala.annotation.tailrec

/**
* A Semaphore that has a specialized release process that optionally allows reduction of permits in batches.
* When permit size after release is a factor of reductionSize, the release process will reset permits to state + 1 - reductionSize;
* otherwise the release will reset permits to state + 1.
* It also maintains an operationCount where a tryAquire + release is a single operation,
* so that we can know once all operations are completed.
* @param maxAllowed
* @param reductionSize
*/
class ResizableSemaphore(maxAllowed: Int, reductionSize: Int) {
private val operationCount = new AtomicInteger(0)
class Sync extends AbstractQueuedSynchronizer {
setState(maxAllowed)

def permits: Int = getState

/** Try to release a permit and return whether or not that operation was successful. */
@tailrec
final def tryReleaseSharedWithResult(releases: Int): Boolean = {
val current = getState
val next2 = current + releases
val (next, reduced) = if (next2 % reductionSize == 0) {
(next2 - reductionSize, true)
} else {
(next2, false)
}
//next MIGHT be < current in case of reduction; this is OK!!!
if (compareAndSetState(current, next)) {
reduced
} else {
tryReleaseSharedWithResult(releases)
}
}

/**
* Try to acquire a permit and return whether or not that operation was successful. Requests may not finish in FIFO
* order, hence this method is not necessarily fair.
*/
@tailrec
final def nonFairTryAcquireShared(acquires: Int): Int = {
val available = getState
val remaining = available - acquires
if (remaining < 0 || compareAndSetState(available, remaining)) {
remaining
} else {
nonFairTryAcquireShared(acquires)
}
}
}

val sync = new Sync

/**
* Acquires the given numbers of permits.
*
* @param acquires the number of permits to get
* @return `true`, iff the internal semaphore's number of permits is positive, `false` if negative
*/
def tryAcquire(acquires: Int = 1): Boolean = {
require(acquires > 0, "cannot acquire negative or no permits")
if (sync.nonFairTryAcquireShared(acquires) >= 0) {
operationCount.incrementAndGet()
true
} else {
false
}
}

/**
* Releases the given amount of permits
*
* @param acquires the number of permits to release
* @return (releaseMemory, releaseAction) releaseMemory is true if concurrency count is a factor of reductionSize
* releaseAction is true if the operationCount reaches 0
*/
def release(acquires: Int = 1, opComplete: Boolean): (Boolean, Boolean) = {
require(acquires > 0, "cannot release negative or no permits")
//release always succeeds, so we can always adjust the operationCount
val releaseAction = if (opComplete) { // an operation completion
operationCount.decrementAndGet() == 0
} else { //otherwise an allocation + operation initialization
operationCount.incrementAndGet() == 0
}
(sync.tryReleaseSharedWithResult(acquires), releaseAction)
}

/** Returns the number of currently available permits. Possibly negative. */
def availablePermits: Int = sync.permits

//for testing
def counter = operationCount.get()
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ object ConfigKeys {
val memory = "whisk.memory"
val timeLimit = "whisk.time-limit"
val logLimit = "whisk.log-limit"
val concurrencyLimit = "whisk.concurrency-limit"
val activation = "whisk.activation"
val userEvents = "whisk.user-events"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ protected class AkkaContainerClient(
case _ =>
//handle missing Content-Length as NoResponseReceived
//also handle 204 as NoResponseReceived, for parity with ApacheBlockingContainerClient client
response.discardEntityBytes().future.map(_ => Left(NoResponseReceived()))
//per https://github.com/akka/akka-http/issues/1459, don't use discardEntityBytes!
//(discardEntityBytes was causing failures in WskUnicodeTests)
response.entity.dataBytes.runWith(Sink.ignore).map(_ => Left(NoResponseReceived()))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ trait Container {
}

/** Initializes code in the container. */
def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
def initialize(initializer: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
implicit transid: TransactionId): Future[Interval] = {
val start = transid.started(
this,
LoggingMarkers.INVOKER_ACTIVATION_INIT,
s"sending initialization to $id $addr",
logLevel = InfoLevel)

val body = JsObject("value" -> initializer)
callContainer("/init", body, timeout, retry = true)
callContainer("/init", body, timeout, maxConcurrent, retry = true)
.andThen { // never fails
case Success(r: RunResult) =>
transid.finished(
Expand Down Expand Up @@ -132,7 +133,7 @@ trait Container {
}

/** Runs code in the container. */
def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, maxConcurrent: Int)(
implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
val start =
Expand All @@ -144,7 +145,7 @@ trait Container {

val parameterWrapper = JsObject("value" -> parameters)
val body = JsObject(parameterWrapper.fields ++ environment.fields)
callContainer("/run", body, timeout, retry = false)
callContainer("/run", body, timeout, maxConcurrent, retry = false)
.andThen { // never fails
case Success(r: RunResult) =>
transid.finished(
Expand Down Expand Up @@ -177,8 +178,11 @@ trait Container {
* @param timeout timeout of the request
* @param retry whether or not to retry the request
*/
protected def callContainer(path: String, body: JsObject, timeout: FiniteDuration, retry: Boolean = false)(
implicit transid: TransactionId): Future[RunResult] = {
protected def callContainer(path: String,
body: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
val conn = if (Container.config.akkaClient) {
Expand All @@ -187,7 +191,8 @@ trait Container {
new ApacheBlockingContainerClient(
s"${addr.host}:${addr.port}",
timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT)
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
maxConcurrent)
}
httpConnection = Some(conn)
conn
Expand Down
Loading

0 comments on commit 789e7a3

Please sign in to comment.