Skip to content

[client] Introduce routes. Foundation for multi transport support. #534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5038676
Merge branch 'master' into client-impr-topics-commands-configuration
makasim Sep 13, 2018
ed942f9
[client] Introduce route and route collection. Rework tags
makasim Sep 13, 2018
ee8f22b
Merge branch 'client-driver-typehints' into client-impr-topics-comman…
makasim Sep 14, 2018
92ff732
Merge branch 'client-driver-typehints' into client-impr-topics-comman…
makasim Sep 14, 2018
b9b62be
Merge branch 'client-driver-typehints' into client-impr-topics-comman…
makasim Sep 14, 2018
977b3b8
Merge branch 'client-driver-typehints' into client-impr-topics-comman…
makasim Sep 14, 2018
c710e9f
Reowrk router processor, remove client routing compiler pass.
makasim Sep 14, 2018
e039e88
add analyze route collection pass
makasim Sep 14, 2018
e397abc
Rework exclusive command extension.
makasim Sep 14, 2018
cf03113
remove tests.
makasim Sep 14, 2018
41a57c2
[client] udpdate config.
makasim Sep 14, 2018
00269de
[client] Introduce GenericDriver.
makasim Sep 14, 2018
b10a438
[client] Add generic driver tests.
makasim Sep 17, 2018
1037c7f
[client] Rework AmqpDriver
makasim Sep 17, 2018
570d34d
Merge remote-tracking branch 'origin/master' into client-impr-topics-…
makasim Sep 17, 2018
613de56
[client] GenericDriver should init delivery delay, prioirt, time to l…
makasim Sep 17, 2018
cde3e3c
[client] fix tests, update rabbitmq driver.
makasim Sep 17, 2018
2e84939
[client] Extend FsDriver from GenericDriver.
makasim Sep 17, 2018
ca7b0b7
[client] Extend DbalDriver from GenericDriver.
makasim Sep 18, 2018
ff3c342
[client] Extend GpsDriver from GenericDriver.
makasim Sep 18, 2018
99e178e
[client] Extend MongodbDriver from GenericDriver.
makasim Sep 18, 2018
84464f0
[client] Remove NullDriver. Use generic instead.
makasim Sep 18, 2018
8bac597
[client] Extend StompDriver from GenericDriver.
makasim Sep 18, 2018
34ca2f6
[client] Extend RabbitMqStompDriver from GenericDriver.
makasim Sep 18, 2018
3f26ba7
[client] Extend rest of drivers from generic one. Rework driver factory.
makasim Sep 19, 2018
f90c8d1
upd simple client.
makasim Sep 21, 2018
c075ec2
upd drivers
makasim Sep 21, 2018
2b9cb8a
upd client.
makasim Sep 21, 2018
e25feb1
client fixes
makasim Sep 21, 2018
39924b9
remove stom examples.
makasim Sep 21, 2018
19ace56
[client] migrate simple client to new concept. adopt route collection.
makasim Sep 21, 2018
cffa1b6
[client] Generic driver should not support message bus
makasim Sep 24, 2018
9904f7f
[job-queue] sync with latest design changes.
makasim Sep 27, 2018
eca10ef
[async-commands][async-events] Sync with latest design chagnes in core.
makasim Sep 27, 2018
ed707fe
Update enqueue architecture. Get rid of meta (queue|topic), rely one …
makasim Sep 27, 2018
7d50b6b
[bundle] Sync Symfony bundle with arch changes.
makasim Sep 27, 2018
dfc1412
[simple-client] Do not use Symfony Container
makasim Sep 27, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/client/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ use Interop\Queue\PsrProcessor;

/** @var \Enqueue\SimpleClient\SimpleClient $client */

$client->bind('a_bar_topic', 'a_processor_name', function(PsrMessage $psrMessage) {
$client->bindTopic('a_bar_topic', function(PsrMessage $psrMessage) {
// processing logic here

return PsrProcessor::ACK;
Expand Down
3 changes: 1 addition & 2 deletions docs/client/rpc_call.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ use Interop\Queue\PsrContext;
use Enqueue\Consumption\Result;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Client\Config;
use Enqueue\SimpleClient\SimpleClient;

/** @var \Interop\Queue\PsrContext $context */

// composer require enqueue/amqp-ext # or enqueue/amqp-bunny, enqueue/amqp-lib
$client = new SimpleClient('amqp:');

$client->bind(Config::COMMAND_TOPIC, 'square', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
$client->bindCommand('square', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
$number = (int) $message->getBody();

return Result::reply($context->createMessage($number ^ 2));
Expand Down
2 changes: 1 addition & 1 deletion docs/laravel/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;

$app->resolving(SimpleClient::class, function (SimpleClient $client, $app) {
$client->bind('enqueue_test', 'a_processor', function(PsrMessage $message) {
$client->bindTopic('enqueue_test', function(PsrMessage $message) {
// do stuff here

return PsrProcessor::ACK;
Expand Down
19 changes: 9 additions & 10 deletions docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,16 @@ $client = new SimpleClient('amqp:');

// composer require enqueue/fs
$client = new SimpleClient('file://foo/bar');

$client->setupBroker();

$client->sendEvent('a_foo_topic', 'message');

$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) {
$client->bindTopic('a_foo_topic', function(PsrMessage $message) {
echo $message->getBody().PHP_EOL;

// your event processor logic here
});

$client->setupBroker();

$client->sendEvent('a_foo_topic', 'message');

// this is a blocking call, it'll consume message until it is interrupted
$client->consume();
```
Expand All @@ -207,18 +206,18 @@ $client = new SimpleClient('amqp:');
// composer require enqueue/fs
//$client = new SimpleClient('file://foo/bar');

$client->setupBroker();

$client->bind(Config::COMMAND_TOPIC, 'bar_command', function(PsrMessage $message) {
$client->bindCommand('bar_command', function(PsrMessage $message) {
// your bar command processor logic here
});

$client->bind(Config::COMMAND_TOPIC, 'baz_reply_command', function(PsrMessage $message, PsrContext $context) {
$client->bindCommand('baz_reply_command', function(PsrMessage $message, PsrContext $context) {
// your baz reply command processor logic here

return Result::reply($context->createMessage('theReplyBody'));
});

$client->setupBroker();

// It is sent to one consumer.
$client->sendCommand('bar_command', 'aMessageData');

Expand Down
3 changes: 1 addition & 2 deletions pkg/async-command/Resources/config/services.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
services:
enqueue.async_command.run_command_processor:
class: 'Enqueue\AsyncCommand\RunCommandProcessor'
public: public
arguments:
- '%kernel.project_dir%'
tags:
- { name: 'enqueue.client.processor' }
- { name: 'enqueue.command_subscriber', client: 'default' }
6 changes: 3 additions & 3 deletions pkg/async-command/RunCommandProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public function process(PsrMessage $message, PsrContext $context): Result
public static function getSubscribedCommand(): array
{
return [
'processorName' => Commands::RUN_COMMAND,
'queueName' => Commands::RUN_COMMAND,
'queueNameHardcoded' => true,
'command' => Commands::RUN_COMMAND,
'queue' => Commands::RUN_COMMAND,
'prefix_queue' => false,
'exclusive' => true,
];
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/async-command/Tests/RunCommandProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public function testShouldSubscribeOnRunCommand()
$subscription = RunCommandProcessor::getSubscribedCommand();

$this->assertSame([
'processorName' => Commands::RUN_COMMAND,
'queueName' => Commands::RUN_COMMAND,
'queueNameHardcoded' => true,
'command' => Commands::RUN_COMMAND,
'queue' => Commands::RUN_COMMAND,
'prefix_queue' => false,
'exclusive' => true,
], $subscription);
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/async-event-dispatcher/AsyncProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

namespace Enqueue\AsyncEventDispatcher;

use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\Result;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

class AsyncProcessor implements PsrProcessor
class AsyncProcessor implements PsrProcessor, CommandSubscriberInterface
{
/**
* @var Registry
Expand All @@ -20,10 +21,6 @@ class AsyncProcessor implements PsrProcessor
*/
private $dispatcher;

/**
* @param Registry $registry
* @param EventDispatcherInterface $dispatcher
*/
public function __construct(Registry $registry, EventDispatcherInterface $dispatcher)
{
$this->registry = $registry;
Expand All @@ -39,9 +36,6 @@ public function __construct(Registry $registry, EventDispatcherInterface $dispat
$this->dispatcher = $dispatcher;
}

/**
* {@inheritdoc}
*/
public function process(PsrMessage $message, PsrContext $context)
{
if (false == $eventName = $message->getProperty('event_name')) {
Expand All @@ -57,4 +51,9 @@ public function process(PsrMessage $message, PsrContext $context)

return self::ACK;
}

public static function getSubscribedCommand()
{
return Commands::DISPATCH_ASYNC_EVENTS;
}
}
8 changes: 8 additions & 0 deletions pkg/async-event-dispatcher/Commands.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Enqueue\AsyncEventDispatcher;

final class Commands
{
const DISPATCH_ASYNC_EVENTS = 'symfony.dispatch_async_events';
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@

class AsyncEventsPass implements CompilerPassInterface
{
/**
* {@inheritdoc}
*/
public function process(ContainerBuilder $container)
public function process(ContainerBuilder $container): void
{
if (false == $container->hasDefinition('enqueue.events.async_listener')) {
return;
Expand Down Expand Up @@ -45,8 +42,9 @@ public function process(ContainerBuilder $container)
;

$container->getDefinition('enqueue.events.async_processor')
->addTag('enqueue.client.processor', [
'topicName' => 'event.'.$event,
->addTag('enqueue.processor', [
'topic' => 'event.'.$event,
'client' => 'default',
])
;

Expand Down Expand Up @@ -78,8 +76,9 @@ public function process(ContainerBuilder $container)
;

$container->getDefinition('enqueue.events.async_processor')
->addTag('enqueue.client.processor', [
->addTag('enqueue.processor', [
'topicName' => 'event.'.$event,
'client' => 'default',
])
;

Expand Down
9 changes: 4 additions & 5 deletions pkg/async-event-dispatcher/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ services:
- '@enqueue.events.event_dispatcher'
tags:
-
name: 'enqueue.client.processor'
topicName: '__command__'
processorName: '%enqueue_events_queue%'
queueName: '%enqueue_events_queue%'
queueNameHardcoded: true
name: 'enqueue.processor'
command: 'symfony.dispatch_async_events'
queue: '%enqueue_events_queue%'
queue_prefixed: false
exclusive: true

enqueue.events.php_serializer_event_transofrmer:
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading