Skip to content

Commit cc134bd

Browse files
authored
Merge pull request #50 from php-enqueue/client-rpc-client
[client] Add RpcClient on client level.
2 parents df7afac + 7d60675 commit cc134bd

25 files changed

+705
-47
lines changed

docs/client/rpc_call.md

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Client. RPC call
2+
3+
4+
## The client side
5+
6+
There is a handy class RpcClient shipped with the client component.
7+
It allows you to easily send a message and wait for a reply.
8+
9+
```php
10+
<?php
11+
use Enqueue\Client\SimpleClient;
12+
use Enqueue\Client\RpcClient;
13+
14+
/** @var \Enqueue\Psr\PsrContext $context */
15+
16+
17+
$client = new SimpleClient($context);
18+
$rpcClient = new RpcClient($client->getProducer(), $context);
19+
20+
$replyMessage = $rpcClient->call('greeting_topic', 'Hi Thomas!', 5);
21+
```
22+
23+
You can perform several requests asynchronously with `callAsync` and request replays later.
24+
25+
```php
26+
<?php
27+
use Enqueue\Client\SimpleClient;
28+
use Enqueue\Client\RpcClient;
29+
30+
/** @var \Enqueue\Psr\PsrContext $context */
31+
32+
33+
$client = new SimpleClient($context);
34+
$rpcClient = new RpcClient($client->getProducer(), $context);
35+
36+
$promises = [];
37+
$promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
38+
$promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
39+
$promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
40+
$promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
41+
42+
$replyMessages = [];
43+
foreach ($promises as $promise) {
44+
$replyMessages[] = $promise->getMessage();
45+
}
46+
```
47+
48+
## The server side
49+
50+
On the server side you may register a processor which returns a result object with a reply message set.
51+
Of course it is possible to implement rpc server side based on transport classes only. That would require a bit more work to do.
52+
53+
```php
54+
<?php
55+
56+
use Enqueue\Client\SimpleClient;
57+
use Enqueue\Psr\PsrMessage;
58+
use Enqueue\Psr\PsrContext;
59+
use Enqueue\Consumption\Result;
60+
use Enqueue\Consumption\ChainExtension;
61+
use Enqueue\Consumption\Extension\ReplyExtension;
62+
63+
/** @var \Enqueue\Psr\PsrContext $context */
64+
65+
$client = new SimpleClient($this->context);
66+
$client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
67+
echo $message->getBody();
68+
69+
return Result::reply($context->createMessage('Hi there! I am John.'));
70+
});
71+
72+
$client->consume(new ChainExtension([new ReplyExtension()]));
73+
```
74+
75+
[back to index](../index.md)

docs/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- [Message examples](client/message_examples.md)
1414
- [Supported brokers](client/supported_brokers.md)
1515
- [Message bus](client/message_bus.md)
16+
- [RPC call](client/rpc_call.md)
1617
* Job queue
1718
- [Run unique job](job_queue/run_unique_job.md)
1819
- [Run sub job(s)](job_queue/run_sub_job.md)

pkg/amqp-ext/Client/AmqpDriver.php

+4
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ public function createTransportMessage(Message $message)
145145
$transportMessage->setProperties($properties);
146146
$transportMessage->setMessageId($message->getMessageId());
147147
$transportMessage->setTimestamp($message->getTimestamp());
148+
$transportMessage->setReplyTo($message->getReplyTo());
149+
$transportMessage->setCorrelationId($message->getCorrelationId());
148150

149151
return $transportMessage;
150152
}
@@ -174,6 +176,8 @@ public function createClientMessage(PsrMessage $message)
174176

175177
$clientMessage->setMessageId($message->getMessageId());
176178
$clientMessage->setTimestamp($message->getTimestamp());
179+
$clientMessage->setReplyTo($message->getReplyTo());
180+
$clientMessage->setCorrelationId($message->getCorrelationId());
177181

178182
return $clientMessage;
179183
}

pkg/amqp-ext/Tests/Client/AmqpDriverTest.php

+12
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public function testShouldConvertTransportMessageToClientMessage()
7575
$transportMessage->setHeader('expiration', '12345000');
7676
$transportMessage->setMessageId('MessageId');
7777
$transportMessage->setTimestamp(1000);
78+
$transportMessage->setReplyTo('theReplyTo');
79+
$transportMessage->setCorrelationId('theCorrelationId');
7880

7981
$driver = new AmqpDriver(
8082
$this->createPsrContextMock(),
@@ -92,6 +94,8 @@ public function testShouldConvertTransportMessageToClientMessage()
9294
'expiration' => '12345000',
9395
'message_id' => 'MessageId',
9496
'timestamp' => 1000,
97+
'reply_to' => 'theReplyTo',
98+
'correlation_id' => 'theCorrelationId',
9599
], $clientMessage->getHeaders());
96100
$this->assertSame([
97101
'key' => 'val',
@@ -100,6 +104,8 @@ public function testShouldConvertTransportMessageToClientMessage()
100104
$this->assertSame(12345, $clientMessage->getExpire());
101105
$this->assertSame('ContentType', $clientMessage->getContentType());
102106
$this->assertSame(1000, $clientMessage->getTimestamp());
107+
$this->assertSame('theReplyTo', $clientMessage->getReplyTo());
108+
$this->assertSame('theCorrelationId', $clientMessage->getCorrelationId());
103109
}
104110

105111
public function testShouldThrowExceptionIfExpirationIsNotNumeric()
@@ -129,6 +135,8 @@ public function testShouldConvertClientMessageToTransportMessage()
129135
$clientMessage->setExpire(123);
130136
$clientMessage->setMessageId('MessageId');
131137
$clientMessage->setTimestamp(1000);
138+
$clientMessage->setReplyTo('theReplyTo');
139+
$clientMessage->setCorrelationId('theCorrelationId');
132140

133141
$context = $this->createPsrContextMock();
134142
$context
@@ -154,12 +162,16 @@ public function testShouldConvertClientMessageToTransportMessage()
154162
'delivery_mode' => 2,
155163
'message_id' => 'MessageId',
156164
'timestamp' => 1000,
165+
'reply_to' => 'theReplyTo',
166+
'correlation_id' => 'theCorrelationId',
157167
], $transportMessage->getHeaders());
158168
$this->assertSame([
159169
'key' => 'val',
160170
], $transportMessage->getProperties());
161171
$this->assertSame('MessageId', $transportMessage->getMessageId());
162172
$this->assertSame(1000, $transportMessage->getTimestamp());
173+
$this->assertSame('theReplyTo', $transportMessage->getReplyTo());
174+
$this->assertSame('theCorrelationId', $transportMessage->getCorrelationId());
163175
}
164176

165177
public function testShouldSendMessageToRouter()

pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php

+12
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public function testShouldConvertTransportMessageToClientMessage()
8484
$transportMessage->setHeader('priority', 3);
8585
$transportMessage->setMessageId('MessageId');
8686
$transportMessage->setTimestamp(1000);
87+
$transportMessage->setReplyTo('theReplyTo');
88+
$transportMessage->setCorrelationId('theCorrelationId');
8789

8890
$driver = new RabbitMqDriver(
8991
$this->createPsrContextMock(),
@@ -102,6 +104,8 @@ public function testShouldConvertTransportMessageToClientMessage()
102104
'priority' => 3,
103105
'message_id' => 'MessageId',
104106
'timestamp' => 1000,
107+
'reply_to' => 'theReplyTo',
108+
'correlation_id' => 'theCorrelationId',
105109
], $clientMessage->getHeaders());
106110
$this->assertSame([
107111
'key' => 'val',
@@ -113,6 +117,8 @@ public function testShouldConvertTransportMessageToClientMessage()
113117
$this->assertSame('ContentType', $clientMessage->getContentType());
114118
$this->assertSame(1000, $clientMessage->getTimestamp());
115119
$this->assertSame(MessagePriority::HIGH, $clientMessage->getPriority());
120+
$this->assertSame('theReplyTo', $clientMessage->getReplyTo());
121+
$this->assertSame('theCorrelationId', $clientMessage->getCorrelationId());
116122
}
117123

118124
public function testShouldThrowExceptionIfXDelayIsNotNumeric()
@@ -202,6 +208,8 @@ public function testShouldConvertClientMessageToTransportMessage()
202208
$clientMessage->setDelay(432);
203209
$clientMessage->setMessageId('MessageId');
204210
$clientMessage->setTimestamp(1000);
211+
$clientMessage->setReplyTo('theReplyTo');
212+
$clientMessage->setCorrelationId('theCorrelationId');
205213

206214
$context = $this->createPsrContextMock();
207215
$context
@@ -227,6 +235,8 @@ public function testShouldConvertClientMessageToTransportMessage()
227235
'delivery_mode' => 2,
228236
'message_id' => 'MessageId',
229237
'timestamp' => 1000,
238+
'reply_to' => 'theReplyTo',
239+
'correlation_id' => 'theCorrelationId',
230240
'priority' => 4,
231241
], $transportMessage->getHeaders());
232242
$this->assertSame([
@@ -235,6 +245,8 @@ public function testShouldConvertClientMessageToTransportMessage()
235245
], $transportMessage->getProperties());
236246
$this->assertSame('MessageId', $transportMessage->getMessageId());
237247
$this->assertSame(1000, $transportMessage->getTimestamp());
248+
$this->assertSame('theReplyTo', $transportMessage->getReplyTo());
249+
$this->assertSame('theCorrelationId', $transportMessage->getCorrelationId());
238250
}
239251

240252
public function testThrowIfDelayNotSupportedOnConvertClientMessageToTransportMessage()

pkg/enqueue-bundle/Resources/config/client.yml

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ services:
1010
enqueue.producer:
1111
alias: 'enqueue.client.producer'
1212

13+
enqueue.client.rpc_client:
14+
class: 'Enqueue\Client\RpcClient'
15+
arguments:
16+
- '@enqueue.client.producer'
17+
- '@enqueue.transport.context'
18+
1319
enqueue.client.router_processor:
1420
class: 'Enqueue\Client\RouterProcessor'
1521
public: true

pkg/enqueue-bundle/Resources/config/services.yml

+5
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,8 @@ services:
1717
- '@enqueue.consumption.queue_consumer'
1818
tags:
1919
- { name: 'console.command' }
20+
21+
enqueue.transport.rpc_client:
22+
class: 'Enqueue\Rpc\RpcClient'
23+
arguments:
24+
- '@enqueue.transport.context'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional;
4+
5+
/**
6+
* @group functional
7+
*/
8+
class RpcClientTest extends WebTestCase
9+
{
10+
public function testTransportRpcClientCouldBeGetFromContainerAsService()
11+
{
12+
$connection = $this->container->get('enqueue.transport.rpc_client');
13+
14+
$this->assertInstanceOf(\Enqueue\Rpc\RpcClient::class, $connection);
15+
}
16+
17+
public function testClientRpcClientCouldBeGetFromContainerAsService()
18+
{
19+
$connection = $this->container->get('enqueue.client.rpc_client');
20+
21+
$this->assertInstanceOf(\Enqueue\Client\RpcClient::class, $connection);
22+
}
23+
}

pkg/enqueue/Client/Message.php

+42
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ class Message
5353
*/
5454
private $delay;
5555

56+
/**
57+
* @var string
58+
*/
59+
private $replyTo;
60+
61+
/**
62+
* @var string
63+
*/
64+
private $correlationId;
65+
5666
/**
5767
* @var array
5868
*/
@@ -204,6 +214,38 @@ public function getScope()
204214
return $this->scope;
205215
}
206216

217+
/**
218+
* @return string
219+
*/
220+
public function getReplyTo()
221+
{
222+
return $this->replyTo;
223+
}
224+
225+
/**
226+
* @param string $replyTo
227+
*/
228+
public function setReplyTo($replyTo)
229+
{
230+
$this->replyTo = $replyTo;
231+
}
232+
233+
/**
234+
* @return string
235+
*/
236+
public function getCorrelationId()
237+
{
238+
return $this->correlationId;
239+
}
240+
241+
/**
242+
* @param string $correlationId
243+
*/
244+
public function setCorrelationId($correlationId)
245+
{
246+
$this->correlationId = $correlationId;
247+
}
248+
207249
/**
208250
* @return array
209251
*/

pkg/enqueue/Client/NullDriver.php

+7-3
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ class NullDriver implements DriverInterface
2121
protected $config;
2222

2323
/**
24-
* @param NullContext $session
24+
* @param NullContext $context
2525
* @param Config $config
2626
*/
27-
public function __construct(NullContext $session, Config $config)
27+
public function __construct(NullContext $context, Config $config)
2828
{
29-
$this->context = $session;
29+
$this->context = $context;
3030
$this->config = $config;
3131
}
3232

@@ -49,6 +49,8 @@ public function createTransportMessage(Message $message)
4949
$transportMessage->setProperties($message->getProperties());
5050
$transportMessage->setTimestamp($message->getTimestamp());
5151
$transportMessage->setMessageId($message->getMessageId());
52+
$transportMessage->setReplyTo($message->getReplyTo());
53+
$transportMessage->setCorrelationId($message->getCorrelationId());
5254

5355
return $transportMessage;
5456
}
@@ -66,6 +68,8 @@ public function createClientMessage(PsrMessage $message)
6668
$clientMessage->setProperties($message->getProperties());
6769
$clientMessage->setTimestamp($message->getTimestamp());
6870
$clientMessage->setMessageId($message->getMessageId());
71+
$clientMessage->setReplyTo($message->getReplyTo());
72+
$clientMessage->setCorrelationId($message->getCorrelationId());
6973

7074
if ($contentType = $message->getHeader('content_type')) {
7175
$clientMessage->setContentType($contentType);

0 commit comments

Comments
 (0)