Skip to content

Commit

Permalink
Anonymous class → defined class
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 29, 2017
1 parent fbc8842 commit a1d8f24
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 86 deletions.
39 changes: 39 additions & 0 deletions lib/Internal/MutexStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

namespace Amp\Sync\Internal;

use Amp\Delayed;
use Amp\Promise;
use Amp\Sync\Lock;
use function Amp\call;

class MutexStorage extends \Threaded {
const LATENCY_TIMEOUT = 10;

/** @var bool */
private $locked = false;

/**
* @return \Amp\Promise
*/
public function acquire(): Promise {
return call(function () {
$tsl = function () {
if ($this->locked) {
return true;
}

$this->locked = true;
return false;
};

while ($this->locked || $this->synchronized($tsl)) {
yield new Delayed(self::LATENCY_TIMEOUT);
}

return new Lock(0, function () {
$this->locked = false;
});
});
}
}
60 changes: 60 additions & 0 deletions lib/Internal/SemaphoreStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

namespace Amp\Sync\Internal;

use Amp\Delayed;
use Amp\Promise;
use Amp\Sync\Lock;
use function Amp\call;

class SemaphoreStorage extends \Threaded {
const LATENCY_TIMEOUT = 10;

/**
* Creates a new semaphore with a given number of locks.
*
* @param int $locks The maximum number of locks that can be acquired from the semaphore.
*/
public function __construct(int $locks) {
foreach (\range(0, $locks - 1) as $lock) {
$this[] = $lock;
}
}

/**
* @return \Amp\Promise
*/
public function acquire(): Promise {
/**
* Uses a double locking mechanism to acquire a lock without blocking. A
* synchronous mutex is used to make sure that the semaphore is queried one
* at a time to preserve the integrity of the semaphore itself. Then a lock
* count is used to check if a lock is available without blocking.
*
* If a lock is not available, we add the request to a queue and set a timer
* to check again in the future.
*/
return call(function () {
$tsl = function () {
// If there are no locks available or the wait queue is not empty,
// we need to wait our turn to acquire a lock.
if (!$this->count()) {
return null;
}

return $this->shift();
};

while (!$this->count() || ($id = $this->synchronized($tsl)) === null) {
yield new Delayed(self::LATENCY_TIMEOUT);
}

return new Lock($id, function (Lock $lock) {
$id = $lock->getId();
$this->synchronized(function () use ($id) {
$this[] = $id;
});
});
});
}
}
35 changes: 2 additions & 33 deletions lib/ThreadedMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,22 @@

namespace Amp\Sync;

use Amp\Delayed;
use Amp\Promise;
use function Amp\call;

/**
* A thread-safe, asynchronous mutex using the pthreads locking mechanism.
*
* Compatible with POSIX systems and Microsoft Windows.
*/
class ThreadedMutex implements Mutex {
/** @var \Threaded */
/** @var Internal\MutexStorage */
private $mutex;

/**
* Creates a new threaded mutex.
*/
public function __construct() {
$this->mutex = new class extends \Threaded {
const LATENCY_TIMEOUT = 10;

/** @var bool */
private $locked = false;

/**
* @return \Amp\Promise
*/
public function acquire(): Promise {
return call(function () {
$tsl = function () {
if ($this->locked) {
return true;
}

$this->locked = true;
return false;
};

while ($this->locked || $this->synchronized($tsl)) {
yield new Delayed(self::LATENCY_TIMEOUT);
}

return new Lock(0, function () {
$this->locked = false;
});
});
}
};
$this->mutex = new Internal\MutexStorage;
}

/**
Expand Down
54 changes: 1 addition & 53 deletions lib/ThreadedSemaphore.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

namespace Amp\Sync;

use Amp\Delayed;
use Amp\Promise;
use function Amp\call;

/**
* An asynchronous semaphore based on pthreads' synchronization methods.
Expand All @@ -28,57 +26,7 @@ public function __construct(int $locks) {
throw new \Error("The number of locks should be a positive integer");
}

$this->semaphore = new class($locks) extends \Threaded {
const LATENCY_TIMEOUT = 10;

/**
* Creates a new semaphore with a given number of locks.
*
* @param int $locks The maximum number of locks that can be acquired from the semaphore.
*/
public function __construct(int $locks) {
foreach (\range(0, $locks - 1) as $lock) {
$this[] = $lock;
}
}

/**
* @return \Amp\Promise
*/
public function acquire(): Promise {
/**
* Uses a double locking mechanism to acquire a lock without blocking. A
* synchronous mutex is used to make sure that the semaphore is queried one
* at a time to preserve the integrity of the semaphore itself. Then a lock
* count is used to check if a lock is available without blocking.
*
* If a lock is not available, we add the request to a queue and set a timer
* to check again in the future.
*/
return call(function () {
$tsl = function () {
// If there are no locks available or the wait queue is not empty,
// we need to wait our turn to acquire a lock.
if (!$this->count()) {
return null;
}

return $this->shift();
};

while (!$this->count() || ($id = $this->synchronized($tsl)) === null) {
yield new Delayed(self::LATENCY_TIMEOUT);
}

return new Lock($id, function (Lock $lock) {
$id = $lock->getId();
$this->synchronized(function () use ($id) {
$this[] = $id;
});
});
});
}
};
$this->semaphore = new Internal\SemaphoreStorage($locks);
}

/**
Expand Down
35 changes: 35 additions & 0 deletions test/ThreadedMutexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Sync\Test;

use Amp\Loop;
use Amp\Sync\Mutex;
use Amp\Sync\ThreadedMutex;

Expand All @@ -12,4 +13,38 @@ class ThreadedMutexTest extends AbstractMutexTest {
public function createMutex(): Mutex {
return new ThreadedMutex;
}

public function testWithinThread() {
$mutex = $this->createMutex();

$thread = new class($mutex) extends \Thread {
private $mutex;

public function __construct(Mutex $mutex) {
$this->mutex = $mutex;
}

public function run() {
Loop::set((new Loop\DriverFactory)->create());
Loop::run(function () {
$this->mutex->acquire()->onResolve(function ($exception, $lock) {
if ($exception) {
throw $exception;
}

Loop::delay(100, [$lock, "release"]);
});
});
}
};

$this->assertRunTimeGreaterThan(function () use ($mutex, $thread) {
$thread->start();

Loop::run(function () use ($mutex) {
$lock = yield $mutex->acquire();
Loop::delay(100, [$lock, "release"]);
});
}, 200);
}
}
35 changes: 35 additions & 0 deletions test/ThreadedSemaphoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Sync\Test;

use Amp\Loop;
use Amp\Sync\Semaphore;
use Amp\Sync\ThreadedSemaphore;

Expand All @@ -12,4 +13,38 @@ class ThreadedSemaphoreTest extends AbstractSemaphoreTest {
public function createSemaphore(int $locks): Semaphore {
return new ThreadedSemaphore($locks);
}

public function testWithinThread() {
$semaphore = $this->createSemaphore(1);

$thread = new class($semaphore) extends \Thread {
private $semaphore;

public function __construct(Semaphore $semaphore) {
$this->semaphore = $semaphore;
}

public function run() {
Loop::set((new Loop\DriverFactory)->create());
Loop::run(function () {
$this->semaphore->acquire()->onResolve(function ($exception, $lock) {
if ($exception) {
throw $exception;
}

Loop::delay(100, [$lock, "release"]);
});
});
}
};

$this->assertRunTimeGreaterThan(function () use ($semaphore, $thread) {
$thread->start();

Loop::run(function () use ($semaphore) {
$lock = yield $semaphore->acquire();
Loop::delay(100, [$lock, "release"]);
});
}, 200);
}
}

0 comments on commit a1d8f24

Please sign in to comment.