Skip to content

Commit

Permalink
util-core: lock-free async semaphore
Browse files Browse the repository at this point in the history
Problem

The current implementation of `AsyncSemaphore` is lock-based and has proved to
be too costly under contention. I've also observed that while testing the fiber
scheduler implementation.

Solution

Migrate to a lock-free implementation similar to how semaphores are implemented
in Linux by using a single atomic counter to indicate the number of available
permits or the number of waiters. The happy path is uses a single CAS and if
there aren't available permits, a concurrent linked queue is used. Since the
queue incurs into additional allocations, I've also optimized the code to avoid
allocations in a few places to compensate.

Result

The new implementation has better performance and lower allocation rate:

baseline
```
[info] Benchmark                                                        Mode  Cnt     Score     Error   Units
[info] AsyncSemaphoreBenchmark.mixed                                    avgt   10  1265.338 ±  97.302   ns/op
[info] AsyncSemaphoreBenchmark.mixed:·gc.alloc.rate                     avgt   10   258.451 ±  14.962  MB/sec
[info] AsyncSemaphoreBenchmark.mixed:·gc.alloc.rate.norm                avgt   10    50.551 ±   0.590    B/op
[info] AsyncSemaphoreBenchmark.mixed:·gc.churn.Par_Eden_Space           avgt   10   260.352 ± 138.484  MB/sec
[info] AsyncSemaphoreBenchmark.mixed:·gc.churn.Par_Eden_Space.norm      avgt   10    51.108 ±  28.172    B/op
[info] AsyncSemaphoreBenchmark.mixed:·gc.count                          avgt   10    12.000            counts
[info] AsyncSemaphoreBenchmark.mixed:·gc.time                           avgt   10     6.000                ms
[info] AsyncSemaphoreBenchmark.noWaiters                                avgt   10   806.794 ± 208.589   ns/op
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.alloc.rate                 avgt   10   131.521 ±  35.256  MB/sec
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.alloc.rate.norm            avgt   10    16.000 ±   0.001    B/op
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.churn.Par_Eden_Space       avgt   10   130.410 ± 169.691  MB/sec
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.churn.Par_Eden_Space.norm  avgt   10    16.496 ±  22.444    B/op
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.count                      avgt   10     6.000            counts
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.time                       avgt   10     5.000                ms
[info] AsyncSemaphoreBenchmark.waiters                                  avgt   10  3671.510 ± 773.611   ns/op
[info] AsyncSemaphoreBenchmark.waiters:·gc.alloc.rate                   avgt   10   213.112 ±  22.384  MB/sec
[info] AsyncSemaphoreBenchmark.waiters:·gc.alloc.rate.norm              avgt   10   120.443 ±  11.879    B/op
[info] AsyncSemaphoreBenchmark.waiters:·gc.churn.Par_Eden_Space         avgt   10   215.990 ±   0.763  MB/sec
[info] AsyncSemaphoreBenchmark.waiters:·gc.churn.Par_Eden_Space.norm    avgt   10   123.114 ±  26.471    B/op
[info] AsyncSemaphoreBenchmark.waiters:·gc.count                        avgt   10    10.000            counts
[info] AsyncSemaphoreBenchmark.waiters:·gc.time                         avgt   10     7.000                ms
```

optimized
```
[info] Benchmark                                                        Mode  Cnt     Score     Error   Units
[info] AsyncSemaphoreBenchmark.mixed                                    avgt   10   826.948 ±  33.943   ns/op
[info] AsyncSemaphoreBenchmark.mixed:·gc.alloc.rate                     avgt   10   383.787 ±  14.822  MB/sec
[info] AsyncSemaphoreBenchmark.mixed:·gc.alloc.rate.norm                avgt   10    48.244 ±   0.177    B/op
[info] AsyncSemaphoreBenchmark.mixed:·gc.churn.Par_Eden_Space           avgt   10   391.142 ±  99.118  MB/sec
[info] AsyncSemaphoreBenchmark.mixed:·gc.churn.Par_Eden_Space.norm      avgt   10    49.164 ±  12.446    B/op
[info] AsyncSemaphoreBenchmark.mixed:·gc.count                          avgt   10    19.000            counts
[info] AsyncSemaphoreBenchmark.mixed:·gc.time                           avgt   10    12.000                ms
[info] AsyncSemaphoreBenchmark.noWaiters                                avgt   10   735.404 ±  24.172   ns/op
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.alloc.rate                 avgt   10   145.514 ±   4.828  MB/sec
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.alloc.rate.norm            avgt   10    16.000 ±   0.001    B/op
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.churn.Par_Eden_Space       avgt   10   143.113 ± 149.378  MB/sec
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.churn.Par_Eden_Space.norm  avgt   10    15.715 ±  16.429    B/op
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.count                      avgt   10     7.000            counts
[info] AsyncSemaphoreBenchmark.noWaiters:·gc.time                       avgt   10     5.000                ms
[info] AsyncSemaphoreBenchmark.waiters                                  avgt   10  2518.887 ± 781.773   ns/op
[info] AsyncSemaphoreBenchmark.waiters:·gc.alloc.rate                   avgt   10   278.906 ±  49.568  MB/sec
[info] AsyncSemaphoreBenchmark.waiters:·gc.alloc.rate.norm              avgt   10   104.685 ±  17.330    B/op
[info] AsyncSemaphoreBenchmark.waiters:·gc.churn.Par_Eden_Space         avgt   10   281.048 ± 157.356  MB/sec
[info] AsyncSemaphoreBenchmark.waiters:·gc.churn.Par_Eden_Space.norm    avgt   10   104.627 ±  55.264    B/op
[info] AsyncSemaphoreBenchmark.waiters:·gc.count                        avgt   10    13.000            counts
[info] AsyncSemaphoreBenchmark.waiters:·gc.time                         avgt   10     9.000                ms
```

JIRA Issues: CSL-8426, VM-4536

Differential Revision: https://phabricator.twitter.biz/D663434
  • Loading branch information
Flavio Brasil authored and jenkins committed May 10, 2021
1 parent 04e4be3 commit 07d525b
Showing 3 changed files with 137 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -8,10 +8,22 @@ import org.openjdk.jmh.annotations._
@Threads(10)
class AsyncSemaphoreBenchmark extends StdBenchAnnotations {

private[this] val semaphore: AsyncSemaphore = new AsyncSemaphore(1)
private[this] val noWaitersSemaphore: AsyncSemaphore = new AsyncSemaphore(10)
private[this] val waitersSemaphore: AsyncSemaphore = new AsyncSemaphore(1)
private[this] val mixedSemaphore: AsyncSemaphore = new AsyncSemaphore(5)

@Benchmark
def acquireAndRelease(): Unit = {
Await.result(semaphore.acquire().map(_.release()))
def noWaiters(): Unit = {
Await.result(noWaitersSemaphore.acquire().map(_.release()))
}

@Benchmark
def waiters(): Unit = {
Await.result(waitersSemaphore.acquire().map(_.release()))
}

@Benchmark
def mixed(): Unit = {
Await.result(mixedSemaphore.acquire().map(_.release()))
}
}
145 changes: 85 additions & 60 deletions util-core/src/main/scala/com/twitter/concurrent/AsyncSemaphore.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.twitter.concurrent

import com.twitter.util._
import java.util.ArrayDeque
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{ConcurrentLinkedQueue, RejectedExecutionException}
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

/**
@@ -50,55 +49,78 @@ class AsyncSemaphore protected (initialPermits: Int, maxWaiters: Option[Int]) {
require(maxWaiters.getOrElse(0) >= 0, s"maxWaiters must be non-negative: $maxWaiters")
require(initialPermits > 0, s"initialPermits must be positive: $initialPermits")

// access to `closed`, `waitq`, and `availablePermits` is synchronized
// by locking on `lock`
private[this] var closed: Option[Throwable] = None
private[this] val waitq = new ArrayDeque[Promise[Permit]]
private[this] var availablePermits = initialPermits
// if failedState (Int.MinValue), failed
// else if >= 0, # of available permits
// else if < 0, # of waiters
private[this] final val state = new AtomicInteger(initialPermits)
@volatile private[this] final var failure: Future[Nothing] = null

// Serves as our intrinsic lock.
private[this] final def lock: Object = waitq
private[this] final val failedState = Int.MinValue
private[this] final val waiters = new ConcurrentLinkedQueue[Waiter]
private[this] final val waitersLimit = maxWaiters.map(v => -v).getOrElse(failedState + 1)

private[this] val semaphorePermit = new Permit {
private[this] val ReturnThis = Return(this)

@tailrec override def release(): Unit = {
val waiter = lock.synchronized {
val next = waitq.pollFirst()
if (next == null) {
availablePermits += 1
}
next
}

if (waiter != null) {
private[this] final val permit = new Permit {
override final def release(): Unit = AsyncSemaphore.this.release()
}
private[this] final val permitFuture = Future.value(permit)
private[this] final val permitReturn = Return(permit)
private[this] final val releasePermit: Any => Unit = _ => permit.release()

@tailrec private[this] final def release(): Unit = {
val s = state.get()
if (s != failedState) {
if (!state.compareAndSet(s, s + 1) ||
// If the waiter is already satisfied, it must
// have been interrupted, so we can simply move on.
if (!waiter.updateIfEmpty(ReturnThis)) {
release()
}
(s < 0 && !pollWaiter().updateIfEmpty(permitReturn))) {
release()
}
}
}

private[this] val futurePermit = Future.value(semaphorePermit)
def numWaiters: Int = {
val s = state.get()
if (s < 0 && s != failedState) -s
else 0
}

def numWaiters: Int = lock.synchronized(waitq.size)
def numInitialPermits: Int = initialPermits
def numPermitsAvailable: Int = lock.synchronized(availablePermits)

def numPermitsAvailable: Int = {
val s = state.get()
if (s > 0) s
else 0
}

/**
* Fail the semaphore and stop it from distributing further permits. Subsequent
* attempts to acquire a permit fail with `exc`. This semaphore's queued waiters
* are also failed with `exc`.
*/
def fail(exc: Throwable): Unit = {
val drained = lock.synchronized {
closed = Some(exc)
waitq.asScala.toList
failure = Future.exception(exc)
var s = state.getAndSet(failedState)
// drain the wait queue
while (s < 0 && s != failedState) {
pollWaiter().raise(exc)
s += 1
}
}

/**
* Polls from the wait queue until it returns an item. Must be called
* after a state modification that indicates that a new waiter will be available
* shortly. The retry accounts for the race when `acquire` updates the state to
* indicate that a waiter is available but it hasn't been added to the queue yet.
* @return
*/
@tailrec private[this] final def pollWaiter(): Waiter = {
val w = waiters.poll()
if (w != null) {
w
} else {
pollWaiter()
}
// delegate dequeuing to the interrupt handler defined in #acquire
drained.foreach(_.raise(exc))
}

/**
@@ -116,29 +138,26 @@ class AsyncSemaphore protected (initialPermits: Int, maxWaiters: Option[Int]) {
* or a Future.Exception[RejectedExecutionException]` if the configured maximum
* number of waiters would be exceeded.
*/
def acquire(): Future[Permit] = lock.synchronized {
if (closed.isDefined)
return Future.exception(closed.get)

if (availablePermits > 0) {
availablePermits -= 1
futurePermit
} else {
maxWaiters match {
case Some(max) if waitq.size >= max =>
MaxWaitersExceededException
case _ =>
val promise = new Promise[Permit]
promise.setInterruptHandler {
case t: Throwable =>
if (promise.updateIfEmpty(Throw(t))) lock.synchronized {
waitq.remove(promise)
}
}
waitq.addLast(promise)
promise
def acquire(): Future[Permit] = {
@tailrec def loop(): Future[Permit] = {
val s = state.get()
if (s == failedState) {
failure
} else if (s == waitersLimit) {
AsyncSemaphore.MaxWaitersExceededException
} else if (state.compareAndSet(s, s - 1)) {
if (s > 0) {
permitFuture
} else {
val w = new Waiter
waiters.add(w)
w
}
} else {
loop()
}
}
loop()
}

/**
@@ -162,9 +181,7 @@ class AsyncSemaphore protected (initialPermits: Int, maxWaiters: Option[Int]) {
permit.release()
throw e
}
f.ensure {
permit.release()
}
f.respond(releasePermit)
}

/**
@@ -178,14 +195,22 @@ class AsyncSemaphore protected (initialPermits: Int, maxWaiters: Option[Int]) {
* returned.
*/
def acquireAndRunSync[T](func: => T): Future[T] =
acquire().flatMap { permit =>
Future(func).ensure {
acquire().map { _ =>
try {
func
} finally {
permit.release()
}
}
}

object AsyncSemaphore {

private final class Waiter extends Promise[Permit] with Promise.InterruptHandler {
// the waiter will be removed from the queue later when a permit is released
override protected def onInterrupt(t: Throwable): Unit = updateIfEmpty(Throw(t))
}

private val MaxWaitersExceededException =
Future.exception(new RejectedExecutionException("Max waiters exceeded"))
}
Original file line number Diff line number Diff line change
@@ -2,7 +2,14 @@ package com.twitter.concurrent

import com.twitter.conversions.DurationOps._
import com.twitter.util._
import java.util.concurrent.{ConcurrentLinkedQueue, RejectedExecutionException, CountDownLatch}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{
ConcurrentLinkedQueue,
CountDownLatch,
Executors,
RejectedExecutionException,
ThreadLocalRandom
}
import scala.collection.mutable
import org.scalatest.Outcome
import org.scalatest.funspec.FixtureAnyFunSpec
@@ -387,5 +394,34 @@ class AsyncSemaphoreTest extends FixtureAnyFunSpec {
val msgs = results.collect { case Some(Throw(e)) => e.getMessage }
assert(msgs.forall(_ == "woop"))
}

it("behaves properly under contention") { () =>
val as = new AsyncSemaphore(5)
val exec = Executors.newCachedThreadPool()
val pool = FuturePool(exec)
val active = new AtomicInteger(0)
try {
val done =
Future.collect {
for (i <- 0 until 10) yield {
pool {
for (j <- 0 until 100) {
val res =
as.acquireAndRunSync {
assert(active.incrementAndGet() <= as.numInitialPermits)
Thread.sleep(ThreadLocalRandom.current().nextInt(10))
assert(active.decrementAndGet() <= as.numInitialPermits)
i * j
}
assert(await(res) == i * j)
}
}
}
}
await(done)
} finally {
exec.shutdown()
}
}
}
}

0 comments on commit 07d525b

Please sign in to comment.