Skip to content

[doc] Add docs about message processors. #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions docs/bundle/message_processor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Message processor

Message processors and usage examples described in [consumption/message_processor](../consumption/message_processor.md)
Here we just show how to register a message processor service to enqueue. Let's say we have app bundle and a message processor there

* [Container tag](#container-tag)
* [Topic subscriber](#topic-subscriber)

# Container tag

```yaml
# src/AppBundle/Resources/services.yml

services:
app.async.say_hello_processor:
class: 'AppBundle\Async\SayHelloProcessor'
tags:
- { name: 'enqueue.client.message_processor', topicName: 'aTopic' }

```

The tag has some additional options:

* topicName [Req]: Tells what topic to consume messages from.
* queueName: By default message processor does not require an extra queue on broker side. It reuse a default one. Setting the option you can define a custom queue to be used.
* processorName: By default the service id is used as message processor name. Using the option you can define a custom name.

# Topic subscriber

There is a `TopicSubscriber` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)).
It allows to keep subscription login and process logic closer to each other.

```php
<?php
namespace AppBundle\Async;

use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Psr\Processor;

class SayHelloProcessor implements Processor, TopicSubscriberInterface
{
public static function getSubscribedTopics()
{
return ['aTopic', 'anotherTopic'];
}
}
```

On the topic subscriber you can also define queue and processor name:

```php
<?php
use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Psr\Processor;

class SayHelloProcessor implements Processor, TopicSubscriberInterface
{
public static function getSubscribedTopics()
{
return [
'aTopic' => ['queueName' => 'fooQueue', 'processorName' => 'foo'],
'anotherTopic' => ['queueName' => 'barQueue', 'processorName' => 'bar'],
];
}
}
```

In the container you can just add the tag `enqueue.client.message_processor` and omit any other options:

```yaml
# src/AppBundle/Resources/services.yml

services:
app.async.say_hello_processor:
class: 'AppBundle\Async\SayHelloProcessor'
tags:
- { name: 'enqueue.client.message_processor'}

```

[back to index](../index.md)
133 changes: 133 additions & 0 deletions docs/consumption/message_processor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Message processor

The message processor is an object that actually process the message and must return a result status.
Here's example:

```php
<?php
use Enqueue\Psr\Processor;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;

class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$this->mailer->send('foo@example.com', $message->getBody());

return self::ACK;
}
}
```

Usually there is no need to catch exceptions.
The message broker can detect consumer has failed and redeliver the message.
Sometimes you have to reject messages explicitly.

```php
<?php
use Enqueue\Psr\Processor;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Util\JSON;

class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$data = JSON::decode($message->getBody());
if ($user = $this->userRepository->find($data['userId'])) {
return self::REJECT;
}

$this->mailer->send($user->getEmail(), $data['text']);

return self::ACK;
}
}
```

It is possible to find out whether the message failed previously or not.
There is `isRedelivered` method for that.
If it returns true than there was attempt to process message.

```php
<?php
use Enqueue\Psr\Processor;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;

class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
if ($message->isRedelivered()) {
return self::REQUEUE;
}

$this->mailer->send('foo@example.com', $message->getBody());

return self::ACK;
}
}
```

The second argument is your context. You can use it to send messages to other queues\topics.

```php
<?php
use Enqueue\Psr\Processor;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;

class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$this->mailer->send('foo@example.com', $message->getBody());

$queue = $context->createQueue('anotherQueue');
$message = $context->createMessage('Message has been sent');
$context->createProducer()->send($queue, $message);

return self::ACK;
}
}
```

The consumption component provide some useful extensions, for example there is an extension that makes RPC processing simplier.

```php
<?php
use Enqueue\Psr\Processor;
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

class SendMailProcessor implements Processor
{
public function process(Message $message, Context $context)
{
$this->mailer->send('foo@example.com', $message->getBody());

$replyMessage = $context->createMessage('Message has been sent');

return Result::reply($replyMessage);
}
}

/** @var \Enqueue\Psr\Context $psrContext */

$queueConsumer = new QueueConsumer($psrContext, new ChainExtension([
new ReplyExtension()
]));

$queueConsumer->bind('foo', new SendMailProcessor());

$queueConsumer->consume();
```

[back to index](../index.md)
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- [Null](null_transport.md)
* Consumption
- [Extensions](consumption/extensions.md)
- [Message processor](consumption/message_processor.md)
* Client
- [Message examples](client/message_examples.md)
- [Supported brokers](client/supported_brokers.md)
Expand All @@ -19,6 +20,7 @@
- [Quick tour](bundle/quick_tour.md)
- [Config reference](bundle/config_reference.md)
- [Cli commands](bundle/cli_commands.md)
- [Message processor](bundle/message_processor.md)
- [Job queue](bundle/job_queue.md)
- [Consumption extension](bundle/consumption_extension.md)
- [Production settings](bundle/production_settings.md)
Expand Down