1
1
# Client. RPC call
2
2
3
3
The client's [ quick tour] ( quick_tour.md ) describes how to get the client object.
4
- Here we'll use ` Enqueue\SimpleClient\SimpleClient ` though it is not required .
5
- You can get all that stuff from manually built client or get objects from a container (Symfony) .
4
+ Here we'll show you how to use Enqueue Client to perform a [ RPC call ] ( https://en.wikipedia.org/wiki/Remote_procedure_call ) .
5
+ You can do it by defining a command which returns something .
6
6
7
- The simple client could be created like this:
7
+ ## The consumer side
8
8
9
- ## The client side
9
+ On the consumer side we have to register a command processor which computes the result and send it back to the sender.
10
+ Pay attention that you have to add reply extension. It wont work without it.
10
11
11
- There is a handy class RpcClient shipped with the client component.
12
- It allows you to easily perform [ RPC calls] ( https://en.wikipedia.org/wiki/Remote_procedure_call ) .
13
- It send a message and wait for a reply.
14
-
15
- ``` php
16
- <?php
17
- use Enqueue\Client\RpcClient;
18
- use Enqueue\SimpleClient\SimpleClient;
19
-
20
- $client = new SimpleClient('amqp://');
21
-
22
- $rpcClient = new RpcClient($client->getProducer(), $context);
23
-
24
- $replyMessage = $rpcClient->call('greeting_topic', 'Hi Thomas!', 5);
25
- ```
26
-
27
- You can perform several requests asynchronously with ` callAsync ` and request replays later.
28
-
29
- ``` php
30
- <?php
31
- use Enqueue\Client\RpcClient;
32
- use Enqueue\SimpleClient\SimpleClient;
33
-
34
- $client = new SimpleClient('amqp://');
35
-
36
- $rpcClient = new RpcClient($client->getProducer(), $context);
37
-
38
- $promises = [];
39
- $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
40
- $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
41
- $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
42
- $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
43
-
44
- $replyMessages = [];
45
- foreach ($promises as $promise) {
46
- $replyMessages[] = $promise->receive();
47
- }
48
- ```
49
-
50
- ## The server side
51
-
52
- On the server side you may register a processor which returns a result object with a reply message set.
53
12
Of course it is possible to implement rpc server side based on transport classes only. That would require a bit more work to do.
54
13
55
14
``` php
@@ -60,19 +19,61 @@ use Interop\Queue\PsrContext;
60
19
use Enqueue\Consumption\Result;
61
20
use Enqueue\Consumption\ChainExtension;
62
21
use Enqueue\Consumption\Extension\ReplyExtension;
22
+ use Enqueue\Client\Config;
63
23
use Enqueue\SimpleClient\SimpleClient;
64
24
65
25
/** @var \Interop\Queue\PsrContext $context */
66
26
67
- $client = new SimpleClient('amqp://');
27
+ // composer require enqueue/amqp-ext # or enqueue/amqp-bunny, enqueue/amqp-lib
28
+ $client = new SimpleClient('amqp:');
68
29
69
- $client->bind('greeting_topic' , 'greeting_processor ', function (PsrMessage $message, PsrContext $context) use (& $requestMessage) {
70
- echo $message->getBody();
30
+ $client->bind(Config::COMMAND_TOPIC , 'square ', function (PsrMessage $message, PsrContext $context) use (& $requestMessage) {
31
+ $number = (int) $message->getBody();
71
32
72
- return Result::reply($context->createMessage('Hi there! I am John.' ));
33
+ return Result::reply($context->createMessage($number ^ 2 ));
73
34
});
74
35
75
36
$client->consume(new ChainExtension([new ReplyExtension()]));
76
37
```
77
38
78
39
[ back to index] ( ../index.md )
40
+
41
+ ## The sender side
42
+
43
+ On the sender's side we need a client which send a command and wait for reply messages.
44
+
45
+ ``` php
46
+ <?php
47
+ use Enqueue\SimpleClient\SimpleClient;
48
+
49
+ $client = new SimpleClient('amqp:');
50
+
51
+ echo $client->sendCommand('square', 5, true)->receive(5000 /* 5 sec */)->getBody();
52
+ ```
53
+
54
+ You can perform several requests asynchronously with ` sendCommand ` and ask for replays later.
55
+
56
+ ``` php
57
+ <?php
58
+ use Enqueue\SimpleClient\SimpleClient;
59
+
60
+ $client = new SimpleClient('amqp:');
61
+
62
+ /** @var \Enqueue\Rpc\Promise[] $promises */
63
+ $promises = [];
64
+ $promises[] = $client->sendCommand('square', 5, true);
65
+ $promises[] = $client->sendCommand('square', 10, true);
66
+ $promises[] = $client->sendCommand('square', 7, true);
67
+ $promises[] = $client->sendCommand('square', 12, true);
68
+
69
+ $replyMessages = [];
70
+ while ($promises) {
71
+ foreach ($promises as $index => $promise) {
72
+ if ($replyMessage = $promise->receiveNoWait()) {
73
+ $replyMessages[$index] = $replyMessage;
74
+
75
+ unset($promises[$index]);
76
+ }
77
+ }
78
+ }
79
+ ```
0 commit comments