diff --git a/examples/swoole.php b/examples/swoole.php new file mode 100644 index 0000000..7d19a61 --- /dev/null +++ b/examples/swoole.php @@ -0,0 +1,26 @@ + SWOOLE_HOOK_ALL]); +Co\Run(function () { + $barrier = Barrier::make(); + for ($i = 0; $i < 3; $i++) { + go(function () use ($barrier) { + $rpc = new Goridge\RPC\RPC( + Goridge\Relay::create('tcp://127.0.0.1:6001') + ); + echo $rpc->call('App.Hi', 'Antony'); + }); + } + Barrier::wait($barrier); +}); diff --git a/src/RPC/RPC.php b/src/RPC/RPC.php index 1e07e9f..adce49a 100644 --- a/src/RPC/RPC.php +++ b/src/RPC/RPC.php @@ -33,7 +33,7 @@ class RPC implements RPCInterface /** * @var positive-int */ - private static int $seq = 1; + private int $seq = 1; /** * @param RelayInterface $relay @@ -85,11 +85,11 @@ public function call(string $method, $payload, $options = null) throw new RPCException('Invalid RPC frame, options missing'); } - if ($frame->options[0] !== self::$seq) { + if ($frame->options[0] !== $this->seq) { throw new RPCException('Invalid RPC frame, sequence mismatch'); } - self::$seq++; + $this->seq++; return $this->decodeResponse($frame, $options); } @@ -170,6 +170,6 @@ private function packFrame(string $method, $payload): Frame } $body = $method . $this->codec->encode($payload); - return new Frame($body, [self::$seq, \strlen($method)], $this->codec->getIndex()); + return new Frame($body, [$this->seq, \strlen($method)], $this->codec->getIndex()); } } diff --git a/tests/Goridge/RPCTest.php b/tests/Goridge/RPCTest.php index b786320..6468c58 100644 --- a/tests/Goridge/RPCTest.php +++ b/tests/Goridge/RPCTest.php @@ -6,6 +6,7 @@ use Exception; use PHPUnit\Framework\TestCase; +use Spiral\Goridge\Frame; use Spiral\Goridge\RelayInterface; use Spiral\Goridge\RPC\Codec\RawCodec; use Spiral\Goridge\RPC\Exception\CodecException; @@ -155,7 +156,7 @@ public function testLongRawBody(): void { $conn = $this->makeRPC(); $payload = random_bytes(65000 * 1000); - + $resp = $conn->withCodec(new RawCodec())->call( 'Service.EchoBinary', $payload @@ -248,6 +249,54 @@ public function testJsonException(): void $conn->call('Service.Process', random_bytes(256)); } + /** + * @doesNotPerformAssertions + */ + public function testCallSequence(): void + { + $relay1 = $this->createMock(RelayInterface::class); + $relay1 + ->method('waitFrame') + ->willReturnOnConsecutiveCalls( + new Frame('Service.Process{}', [1, 15]), + new Frame('Service.Process{}', [2, 15]), + new Frame('Service.Process{}', [3, 15]) + ); + $relay1 + ->method('send') + ->withConsecutive( + [new Frame('Service.Process{"Name":"foo","Value":18}', [1, 15], 8)], + [new Frame('Service.Process{"Name":"foo","Value":18}', [2, 15], 8)], + [new Frame('Service.Process{"Name":"foo","Value":18}', [3, 15], 8)] + ); + + $relay2 = $this->createMock(RelayInterface::class); + $relay2 + ->method('waitFrame') + ->willReturnOnConsecutiveCalls( + new Frame('Service.Process{}', [1, 15]), + new Frame('Service.Process{}', [2, 15]), + new Frame('Service.Process{}', [3, 15]) + ); + $relay2 + ->method('send') + ->withConsecutive( + [new Frame('Service.Process{"Name":"bar","Value":18}', [1, 15], 8)], + [new Frame('Service.Process{"Name":"bar","Value":18}', [2, 15], 8)], + [new Frame('Service.Process{"Name":"bar","Value":18}', [3, 15], 8)] + ); + + $conn1 = new RPC($relay1); + $conn2 = new RPC($relay2); + + $conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]); + $conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]); + $conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]); + $conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]); + $conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]); + $conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]); + } + /** * @return RPC */