diff --git a/lib/Internal/MutexStorage.php b/lib/Internal/MutexStorage.php new file mode 100644 index 0000000..9310a9f --- /dev/null +++ b/lib/Internal/MutexStorage.php @@ -0,0 +1,39 @@ +locked) { + return true; + } + + $this->locked = true; + return false; + }; + + while ($this->locked || $this->synchronized($tsl)) { + yield new Delayed(self::LATENCY_TIMEOUT); + } + + return new Lock(0, function () { + $this->locked = false; + }); + }); + } +} diff --git a/lib/Internal/SemaphoreStorage.php b/lib/Internal/SemaphoreStorage.php new file mode 100644 index 0000000..05fa508 --- /dev/null +++ b/lib/Internal/SemaphoreStorage.php @@ -0,0 +1,60 @@ +count()) { + return null; + } + + return $this->shift(); + }; + + while (!$this->count() || ($id = $this->synchronized($tsl)) === null) { + yield new Delayed(self::LATENCY_TIMEOUT); + } + + return new Lock($id, function (Lock $lock) { + $id = $lock->getId(); + $this->synchronized(function () use ($id) { + $this[] = $id; + }); + }); + }); + } +} diff --git a/lib/ThreadedMutex.php b/lib/ThreadedMutex.php index 546aeac..d18809c 100644 --- a/lib/ThreadedMutex.php +++ b/lib/ThreadedMutex.php @@ -2,9 +2,7 @@ namespace Amp\Sync; -use Amp\Delayed; use Amp\Promise; -use function Amp\call; /** * A thread-safe, asynchronous mutex using the pthreads locking mechanism. @@ -12,43 +10,14 @@ * Compatible with POSIX systems and Microsoft Windows. */ class ThreadedMutex implements Mutex { - /** @var \Threaded */ + /** @var Internal\MutexStorage */ private $mutex; /** * Creates a new threaded mutex. */ public function __construct() { - $this->mutex = new class extends \Threaded { - const LATENCY_TIMEOUT = 10; - - /** @var bool */ - private $locked = false; - - /** - * @return \Amp\Promise - */ - public function acquire(): Promise { - return call(function () { - $tsl = function () { - if ($this->locked) { - return true; - } - - $this->locked = true; - return false; - }; - - while ($this->locked || $this->synchronized($tsl)) { - yield new Delayed(self::LATENCY_TIMEOUT); - } - - return new Lock(0, function () { - $this->locked = false; - }); - }); - } - }; + $this->mutex = new Internal\MutexStorage; } /** diff --git a/lib/ThreadedSemaphore.php b/lib/ThreadedSemaphore.php index e8ff612..8fb0750 100644 --- a/lib/ThreadedSemaphore.php +++ b/lib/ThreadedSemaphore.php @@ -2,9 +2,7 @@ namespace Amp\Sync; -use Amp\Delayed; use Amp\Promise; -use function Amp\call; /** * An asynchronous semaphore based on pthreads' synchronization methods. @@ -28,57 +26,7 @@ public function __construct(int $locks) { throw new \Error("The number of locks should be a positive integer"); } - $this->semaphore = new class($locks) extends \Threaded { - const LATENCY_TIMEOUT = 10; - - /** - * Creates a new semaphore with a given number of locks. - * - * @param int $locks The maximum number of locks that can be acquired from the semaphore. - */ - public function __construct(int $locks) { - foreach (\range(0, $locks - 1) as $lock) { - $this[] = $lock; - } - } - - /** - * @return \Amp\Promise - */ - public function acquire(): Promise { - /** - * Uses a double locking mechanism to acquire a lock without blocking. A - * synchronous mutex is used to make sure that the semaphore is queried one - * at a time to preserve the integrity of the semaphore itself. Then a lock - * count is used to check if a lock is available without blocking. - * - * If a lock is not available, we add the request to a queue and set a timer - * to check again in the future. - */ - return call(function () { - $tsl = function () { - // If there are no locks available or the wait queue is not empty, - // we need to wait our turn to acquire a lock. - if (!$this->count()) { - return null; - } - - return $this->shift(); - }; - - while (!$this->count() || ($id = $this->synchronized($tsl)) === null) { - yield new Delayed(self::LATENCY_TIMEOUT); - } - - return new Lock($id, function (Lock $lock) { - $id = $lock->getId(); - $this->synchronized(function () use ($id) { - $this[] = $id; - }); - }); - }); - } - }; + $this->semaphore = new Internal\SemaphoreStorage($locks); } /** diff --git a/test/ThreadedMutexTest.php b/test/ThreadedMutexTest.php index 1bae33d..5b004c7 100644 --- a/test/ThreadedMutexTest.php +++ b/test/ThreadedMutexTest.php @@ -2,6 +2,7 @@ namespace Amp\Sync\Test; +use Amp\Loop; use Amp\Sync\Mutex; use Amp\Sync\ThreadedMutex; @@ -12,4 +13,38 @@ class ThreadedMutexTest extends AbstractMutexTest { public function createMutex(): Mutex { return new ThreadedMutex; } + + public function testWithinThread() { + $mutex = $this->createMutex(); + + $thread = new class($mutex) extends \Thread { + private $mutex; + + public function __construct(Mutex $mutex) { + $this->mutex = $mutex; + } + + public function run() { + Loop::set((new Loop\DriverFactory)->create()); + Loop::run(function () { + $this->mutex->acquire()->onResolve(function ($exception, $lock) { + if ($exception) { + throw $exception; + } + + Loop::delay(100, [$lock, "release"]); + }); + }); + } + }; + + $this->assertRunTimeGreaterThan(function () use ($mutex, $thread) { + $thread->start(); + + Loop::run(function () use ($mutex) { + $lock = yield $mutex->acquire(); + Loop::delay(100, [$lock, "release"]); + }); + }, 200); + } } diff --git a/test/ThreadedSemaphoreTest.php b/test/ThreadedSemaphoreTest.php index dd997da..b269b42 100644 --- a/test/ThreadedSemaphoreTest.php +++ b/test/ThreadedSemaphoreTest.php @@ -2,6 +2,7 @@ namespace Amp\Sync\Test; +use Amp\Loop; use Amp\Sync\Semaphore; use Amp\Sync\ThreadedSemaphore; @@ -12,4 +13,38 @@ class ThreadedSemaphoreTest extends AbstractSemaphoreTest { public function createSemaphore(int $locks): Semaphore { return new ThreadedSemaphore($locks); } + + public function testWithinThread() { + $semaphore = $this->createSemaphore(1); + + $thread = new class($semaphore) extends \Thread { + private $semaphore; + + public function __construct(Semaphore $semaphore) { + $this->semaphore = $semaphore; + } + + public function run() { + Loop::set((new Loop\DriverFactory)->create()); + Loop::run(function () { + $this->semaphore->acquire()->onResolve(function ($exception, $lock) { + if ($exception) { + throw $exception; + } + + Loop::delay(100, [$lock, "release"]); + }); + }); + } + }; + + $this->assertRunTimeGreaterThan(function () use ($semaphore, $thread) { + $thread->start(); + + Loop::run(function () use ($semaphore) { + $lock = yield $semaphore->acquire(); + Loop::delay(100, [$lock, "release"]); + }); + }, 200); + } }