diff --git a/docs/bundle/message_processor.md b/docs/bundle/message_processor.md new file mode 100644 index 000000000..1cc8b1d0f --- /dev/null +++ b/docs/bundle/message_processor.md @@ -0,0 +1,81 @@ +# Message processor + +Message processors and usage examples described in [consumption/message_processor](../consumption/message_processor.md) +Here we just show how to register a message processor service to enqueue. Let's say we have app bundle and a message processor there + +* [Container tag](#container-tag) +* [Topic subscriber](#topic-subscriber) + +# Container tag + +```yaml +# src/AppBundle/Resources/services.yml + +services: + app.async.say_hello_processor: + class: 'AppBundle\Async\SayHelloProcessor' + tags: + - { name: 'enqueue.client.message_processor', topicName: 'aTopic' } + +``` + +The tag has some additional options: + +* topicName [Req]: Tells what topic to consume messages from. +* queueName: By default message processor does not require an extra queue on broker side. It reuse a default one. Setting the option you can define a custom queue to be used. +* processorName: By default the service id is used as message processor name. Using the option you can define a custom name. + +# Topic subscriber + +There is a `TopicSubscriber` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)). +It allows to keep subscription login and process logic closer to each other. + +```php + ['queueName' => 'fooQueue', 'processorName' => 'foo'], + 'anotherTopic' => ['queueName' => 'barQueue', 'processorName' => 'bar'], + ]; + } +} +``` + +In the container you can just add the tag `enqueue.client.message_processor` and omit any other options: + +```yaml +# src/AppBundle/Resources/services.yml + +services: + app.async.say_hello_processor: + class: 'AppBundle\Async\SayHelloProcessor' + tags: + - { name: 'enqueue.client.message_processor'} + +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/consumption/message_processor.md b/docs/consumption/message_processor.md new file mode 100644 index 000000000..69434f467 --- /dev/null +++ b/docs/consumption/message_processor.md @@ -0,0 +1,133 @@ +# Message processor + +The message processor is an object that actually process the message and must return a result status. +Here's example: + +```php +mailer->send('foo@example.com', $message->getBody()); + + return self::ACK; + } +} +``` + +Usually there is no need to catch exceptions. +The message broker can detect consumer has failed and redeliver the message. +Sometimes you have to reject messages explicitly. + +```php +getBody()); + if ($user = $this->userRepository->find($data['userId'])) { + return self::REJECT; + } + + $this->mailer->send($user->getEmail(), $data['text']); + + return self::ACK; + } +} +``` + +It is possible to find out whether the message failed previously or not. +There is `isRedelivered` method for that. +If it returns true than there was attempt to process message. + +```php +isRedelivered()) { + return self::REQUEUE; + } + + $this->mailer->send('foo@example.com', $message->getBody()); + + return self::ACK; + } +} +``` + +The second argument is your context. You can use it to send messages to other queues\topics. + +```php +mailer->send('foo@example.com', $message->getBody()); + + $queue = $context->createQueue('anotherQueue'); + $message = $context->createMessage('Message has been sent'); + $context->createProducer()->send($queue, $message); + + return self::ACK; + } +} +``` + +The consumption component provide some useful extensions, for example there is an extension that makes RPC processing simplier. + +```php +mailer->send('foo@example.com', $message->getBody()); + + $replyMessage = $context->createMessage('Message has been sent'); + + return Result::reply($replyMessage); + } +} + +/** @var \Enqueue\Psr\Context $psrContext */ + +$queueConsumer = new QueueConsumer($psrContext, new ChainExtension([ + new ReplyExtension() +])); + +$queueConsumer->bind('foo', new SendMailProcessor()); + +$queueConsumer->consume(); +``` + +[back to index](../index.md) \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index ada57c0ab..867ce2120 100644 --- a/docs/index.md +++ b/docs/index.md @@ -8,6 +8,7 @@ - [Null](null_transport.md) * Consumption - [Extensions](consumption/extensions.md) + - [Message processor](consumption/message_processor.md) * Client - [Message examples](client/message_examples.md) - [Supported brokers](client/supported_brokers.md) @@ -19,6 +20,7 @@ - [Quick tour](bundle/quick_tour.md) - [Config reference](bundle/config_reference.md) - [Cli commands](bundle/cli_commands.md) + - [Message processor](bundle/message_processor.md) - [Job queue](bundle/job_queue.md) - [Consumption extension](bundle/consumption_extension.md) - [Production settings](bundle/production_settings.md)