Skip to content

Commit

Permalink
track per action operations so that map keys can be released
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Sep 16, 2018
1 parent e6955d6 commit 9c94647
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 49 deletions.
46 changes: 12 additions & 34 deletions common/scala/src/main/scala/whisk/common/NestedSemaphore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package whisk.common

import java.util.concurrent.Semaphore
import scala.collection.concurrent.TrieMap

/**
Expand All @@ -28,7 +27,6 @@ import scala.collection.concurrent.TrieMap
* @tparam T
*/
class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPermits) {
private val actionLocks = TrieMap.empty[T, Semaphore]
private val actionConcurrentSlotsMap = TrieMap.empty[T, ResizableSemaphore] //one key per action; resized per container

final def tryAcquireConcurrent(actionid: T, maxConcurrent: Int, memoryPermits: Int): Boolean = {
Expand Down Expand Up @@ -69,39 +67,17 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
concurrentSlots.synchronized {
if (concurrentSlots.tryAcquire(1)) {
true
} else if (force) {
super.forceAcquire(memoryPermits)
concurrentSlots.release(maxConcurrent - 1, false)
true
} else if (super.tryAcquire(memoryPermits)) {
concurrentSlots.release(maxConcurrent - 1)
concurrentSlots.release(maxConcurrent - 1, false)
true
} else {
false
}
}

// //without synchonized:
// val actionMutex = actionLocks.getOrElseUpdate(actionid, {
// new Semaphore(1)
// })
//
// if (actionMutex.tryAcquire()) {
// //double check concurrentSlots, then attempt memory aquire for new container, then false
// val result = if (concurrentSlots.tryAcquire(1)) {
// true
// } else if (force) {
// super.forceAcquire(memoryPermits)
// concurrentSlots.release(maxConcurrent - 1)
// true
// } else if (super.tryAcquire(memoryPermits)) {
// concurrentSlots.release(maxConcurrent - 1)
// true
// } else {
// false
// }
// actionMutex.release(1)
// result
// } else {
// tryOrForceAcquireConcurrent(actionid, maxConcurrent, memoryPermits, force)
// }

}
}

Expand All @@ -124,13 +100,15 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
if (maxConcurrent == 1) {
super.release(memoryPermits)
} else {
val concurrentSlots = actionConcurrentSlotsMap
.getOrElseUpdate(actionid, new ResizableSemaphore(0, maxConcurrent))

if (concurrentSlots.release(1)) {
val concurrentSlots = actionConcurrentSlotsMap(actionid)
val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
//concurrent slots
if (memoryRelease) {
super.release(memoryPermits)
}

if (actionRelease) {
actionConcurrentSlotsMap.remove(actionid)
}
}
}
//for testing
Expand Down
19 changes: 17 additions & 2 deletions common/scala/src/main/scala/whisk/common/ResizableSemaphore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@

package whisk.common

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import scala.annotation.tailrec

/**
* A Semaphore that has a specialized release process that optionally allows reduction of permits in batches.
* When permit size after release is a factor of reductionSize, the release process will reset permits to state + 1 - reductionSize;
* otherwise the release will reset permits to state + 1.
* It also maintains an operationCount where a tryAquire + release is a single operation,
* so that we can know once all operations are completed.
* @param maxAllowed
* @param reductionSize
*/
class ResizableSemaphore(maxAllowed: Int, reductionSize: Int) {
private val operationCount = new AtomicInteger(0)
class Sync extends AbstractQueuedSynchronizer {
setState(maxAllowed)

Expand Down Expand Up @@ -78,6 +82,7 @@ class ResizableSemaphore(maxAllowed: Int, reductionSize: Int) {
def tryAcquire(acquires: Int = 1): Boolean = {
require(acquires > 0, "cannot acquire negative or no permits")
if (sync.nonFairTryAcquireShared(acquires) >= 0) {
operationCount.incrementAndGet()
true
} else {
false
Expand All @@ -88,13 +93,23 @@ class ResizableSemaphore(maxAllowed: Int, reductionSize: Int) {
* Releases the given amount of permits
*
* @param acquires the number of permits to release
* @return (releaseMemory, releaseAction) releaseMemory is true if concurrency count is a factor of reductionSize
* releaseAction is true if the operationCount reaches 0
*/
def release(acquires: Int = 1): Boolean = {
def release(acquires: Int = 1, opComplete: Boolean): (Boolean, Boolean) = {
require(acquires > 0, "cannot release negative or no permits")
sync.tryReleaseSharedWithResult(acquires)
//release always succeeds, so we can always adjust the operationCount
val releaseAction = if (opComplete) { // an operation completion
operationCount.decrementAndGet() == 0
} else { //otherwise an allocation + operation initialization
operationCount.incrementAndGet() == 0
}
(sync.tryReleaseSharedWithResult(acquires), releaseAction)
}

/** Returns the number of currently available permits. Possibly negative. */
def availablePermits: Int = sync.permits

//for testing
def counter = operationCount.get()
}
99 changes: 93 additions & 6 deletions tests/src/test/scala/whisk/common/ResizableSemaphoreTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers {
an[IllegalArgumentException] should be thrownBy s.tryAcquire(0)
an[IllegalArgumentException] should be thrownBy s.tryAcquire(-1)

an[IllegalArgumentException] should be thrownBy s.release(0)
an[IllegalArgumentException] should be thrownBy s.release(-1)
an[IllegalArgumentException] should be thrownBy s.release(0, true)
an[IllegalArgumentException] should be thrownBy s.release(-1, true)
}

it should "allow to acquire the defined amount of permits only" in {
Expand Down Expand Up @@ -62,24 +62,100 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers {
it should "allow to resize permits when factor of reductionSize is reached during release" in {
val s = new ResizableSemaphore(2, 5)
s.tryAcquire() shouldBe true // 1 permit left
s.counter shouldBe 1
s.tryAcquire() shouldBe true // 0 permits left
s.counter shouldBe 2
s.tryAcquire() shouldBe false
s.release(4) shouldBe false // 4 permits left
s.counter shouldBe 2
s.release(4, false) shouldBe (false, false) // 4 permits left
s.counter shouldBe 3

s.tryAcquire(4) shouldBe true
s.counter shouldBe 4

s.tryAcquire() shouldBe false
s.release(5) shouldBe true // 0 permits left (5 permits reduced to 0)
s.counter shouldBe 4
s.release(5, false) shouldBe (true, false) // 0 permits left (5 permits reduced to 0)
s.counter shouldBe 5
s.tryAcquire() shouldBe false
s.release(6) shouldBe false // 5 permits left
s.counter shouldBe 5
s.release(6, false) shouldBe (false, false) // 5 permits left
s.counter shouldBe 6
s.tryAcquire() shouldBe true // 5 permits left
s.counter shouldBe 7
s.tryAcquire() shouldBe true // 4 permits left
s.counter shouldBe 8
s.tryAcquire() shouldBe true // 3 permits left
s.counter shouldBe 9
s.tryAcquire() shouldBe true // 2 permits left
s.counter shouldBe 10
s.tryAcquire() shouldBe true // 1 permits left
s.counter shouldBe 11
s.tryAcquire() shouldBe true // 0 permits left
s.counter shouldBe 12

s.tryAcquire() shouldBe false
s.release(10) shouldBe true // 5 permits left (10 permits reduced to 5)
s.counter shouldBe 12
s.release(10, false) shouldBe (true, false) // 5 permits left (10 permits reduced to 5)
s.counter shouldBe 13
s.tryAcquire() shouldBe true
s.counter shouldBe 14
s.availablePermits shouldBe 4
s.release(1, true) shouldBe (true, false)
s.counter shouldBe 13
s.availablePermits shouldBe 0

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 12
s.availablePermits shouldBe 1

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 11
s.availablePermits shouldBe 2

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 10
s.availablePermits shouldBe 3

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 9
s.availablePermits shouldBe 4

s.release(1, true) shouldBe (true, false)
s.counter shouldBe 8
s.availablePermits shouldBe 0

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 7
s.availablePermits shouldBe 1

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 6
s.availablePermits shouldBe 2

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 5
s.availablePermits shouldBe 3

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 4
s.availablePermits shouldBe 4

s.release(1, true) shouldBe (true, false)
s.counter shouldBe 3
s.availablePermits shouldBe 0

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 2
s.availablePermits shouldBe 1

s.release(1, true) shouldBe (false, false)
s.counter shouldBe 1
s.availablePermits shouldBe 2

s.release(1, true) shouldBe (false, true)
s.counter shouldBe 0
s.availablePermits shouldBe 3
}

it should "not give away more permits even under concurrent load" in {
Expand All @@ -93,4 +169,15 @@ class ResizableSemaphoreTests extends FlatSpec with Matchers {
acquires should contain theSameElementsAs result
}
}

it should "release permits even under concurrent load" in {
val s = new ResizableSemaphore(32, 35)
// try to acquire more permits than allowed in parallel
val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq

(0 until 32).par.map(_ => s.release(1, true))
s.counter shouldBe 0

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -499,12 +499,12 @@ class ShardingContainerPoolBalancerTests
var nextInvoker = home
ids.toList.grouped(maxActivationsPerInvoker).zipWithIndex.foreach { g =>
val remaining = rem(g._1.size)
val leftover = balancer.schedulingState._invokerSlots
val concurrentState = balancer.schedulingState._invokerSlots
.lift(nextInvoker)
.get
.concurrentState(fqn)
.availablePermits
leftover shouldBe remaining
concurrentState.availablePermits shouldBe remaining
concurrentState.counter shouldBe g._1.size
nextInvoker = (nextInvoker + stepSize) % numInvokers
}

Expand All @@ -518,16 +518,16 @@ class ShardingContainerPoolBalancerTests

//verify invokers go back to unused state
invokers.foreach { i =>
val invokerStates = balancer.schedulingState._invokerSlots
val concurrentState = balancer.schedulingState._invokerSlots
.lift(i.id.toInt)
.get
.concurrentState
.get(fqn)

invokerStates.map { cstate =>
cstate.availablePermits shouldBe 0
concurrentState shouldBe None
balancer.schedulingState._invokerSlots.lift(i.id.toInt).map { i =>
i.availablePermits shouldBe invokerMem.toMB
}
balancer.schedulingState._invokerSlots.lift(i.id.toInt).map(_.availablePermits shouldBe invokerMem.toMB)

}
}
Expand Down

0 comments on commit 9c94647

Please sign in to comment.