Skip to content

Queue monitoring. #606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ Features:
* [Yii2. Amqp driver](docs/yii/amqp_driver.md)
* [Message bus](docs/quick_tour.md#client) support.
* [RPC over MQ](docs/quick_tour.md#remote-procedure-call-rpc) support.
* [Monitoring](monitoring.md)
* Temporary queues support.
* Well designed, decoupled and reusable components.
* Carefully tested (unit & functional).
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"richardfullmer/rabbitmq-management-api": "^2.0",
"predis/predis": "^1.1",
"thruway/pawl-transport": "^0.5.0",
"voryx/thruway": "^0.5.3"
"voryx/thruway": "^0.5.3",
"influxdb/influxdb-php": "^1.14"
},
"require-dev": {
"phpunit/phpunit": "^5.5",
Expand Down Expand Up @@ -78,6 +79,7 @@
"Enqueue\\Test\\": "pkg/test/",
"Enqueue\\Dsn\\": "pkg/dsn/",
"Enqueue\\Wamp\\": "pkg/wamp/",
"Enqueue\\Monitoring\\": "pkg/monitoring/",
"Enqueue\\": "pkg/enqueue/"
},
"exclude-from-classmap": [
Expand Down
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,16 @@ services:
HOSTNAME_EXTERNAL: 'localstack'
SERVICES: 'sqs'

influxdb:
image: 'influxdb:latest'

chronograf:
image: 'chronograf:latest'
entrypoint: 'chronograf --influxdb-url=http://influxdb:8086'
ports:
- '8888:8888'

grafana:
image: 'grafana/grafana:latest'
ports:
- '3000:3000'
Binary file added docs/images/grafana_monitoring.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made
- [AMQP Interop driver](yii/amqp_driver.md)
* [EnqueueElasticaBundle. Overview](elastica-bundle/overview.md)
* [DSN Parser](dsn.md)
* [Monitoring](monitoring.md)
* [Use cases](#use-cases)
- [Symfony. Async event dispatcher](async_event_dispatcher/quick_tour.md)
- [Monolog. Send messages to message queue](monolog/send-messages-to-mq.md)
Expand Down
296 changes: 296 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
<h2 align="center">Supporting Enqueue</h2>

Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:

- [Become a sponsor](https://www.patreon.com/makasim)
- [Become our client](http://forma-pro.com/)

---

# Monitoring.

Enqueue provides a tool for monitoring your queues.
With it, you can control how many messages were sent, how many processed successful or failed.
How many consumers are working, their up time, processed messages stats, memory usage and system load.
The tool could be integrated with virtually any analytics and monitoring platform.
There are several integration:
* [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/)
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)

We are working on a JS\WAMP based real-time UI tool, for more information please [contact us](opensource@forma-pro.com).

![Grafana Monitoring](images/grafana_monitoring.jpg)

* [Installation](#installation)
* [Track sent messages](#track-sent-messages)
* [Track consumed message](#track-consumed-message)
* [Track consumer metrics](#track-consumer-metrics)
* [Consumption extension](#consumption-extension)
* [Enqueue Client Extension](#enqueue-client-extension)
* [InfluxDB Storage](#influxdb-storage)
* [WAMP (Web Socket Messaging Protocol) Storage](#wamp-(web-socket-messaging-protocol)-storage)
* [Symfony App](#symfony-app)

## Installation

```bash
composer req enqueue/monitoring:0.9.x-dev
```

## Track sent messages

```php
<?php
use Enqueue\Monitoring\SentMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushSentMessageStats(new SentMessageStats(
(int) (microtime(true) * 1000), // timestamp
'queue_name', // queue
'aMessageId',
'aCorrelationId',
[], // headers
[] // properties
));
```

or, if you work with [Queue Interop](https://github.com/queue-interop/queue-interop) transport here's how you can track a message sent

```php
<?php
use Interop\Queue\Context;
use Enqueue\Monitoring\SentMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;

/** @var Context $context */

$queue = $context->createQueue('foo');
$message = $context->createMessage('body');

$context->createProducer()->send($queue, $message);

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushSentMessageStats(new SentMessageStats(
(int) (microtime(true) * 1000),
$queue->getQueueName(),
$message->getMessageId(),
$message->getCorrelationId(),
$message->getHeaders()[],
$message->getProperties()
));
```

## Track consumed message

```php
<?php
use Enqueue\Monitoring\ConsumedMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;

$receivedAt = (int) (microtime(true) * 1000);

// heavy processing here.

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
'consumerId',
(int) (microtime(true) * 1000), // now
$receivedAt,
'aQueue',
'aMessageId',
'aCorrelationId',
[], // headers
[], // properties
false, // redelivered or not
ConsumedMessageStats::STATUS_ACK
));
```

or, if you work with [Queue Interop](https://github.com/queue-interop/queue-interop) transport here's how you can track a message sent

```php
<?php
use Interop\Queue\Context;
use Enqueue\Monitoring\ConsumedMessageStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;

/** @var Context $context */

$queue = $context->createQueue('foo');

$consumer = $context->createConsumer($queue);

$consumerId = uniqid('consumer-id', true); // we suggest using UUID here
if ($message = $consumer->receiveNoWait()) {
$receivedAt = (int) (microtime(true) * 1000);

// heavy processing here.

$consumer->acknowledge($message);

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumedMessageStats(new ConsumedMessageStats(
$consumerId,
(int) (microtime(true) * 1000), // now
$receivedAt,
$queue->getQueueName(),
$message->getMessageId(),
$message->getCorrelationId(),
$message->getHeaders(),
$message->getProperties(),
$message->isRedelivered(),
ConsumedMessageStats::STATUS_ACK
));
}
```

## Track consumer metrics

Consumers are long running processes. It vital to know how many of them are running right now, how they perform, how much memory do they use and so.
This example shows how you can send such metrics.
Call this code from time to time between processing messages.

```php
<?php
use Enqueue\Monitoring\ConsumerStats;
use Enqueue\Monitoring\GenericStatsStorageFactory;

$startedAt = (int) (microtime(true) * 1000);

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
$statsStorage->pushConsumerStats(new ConsumerStats(
'consumerId',
(int) (microtime(true) * 1000), // now
$startedAt,
null, // finished at
true, // is started?
false, // is finished?
false, // is failed
['foo'], // consume from queues
123, // received messages
120, // acknowledged messages
1, // rejected messages
1, // requeued messages
memory_get_usage(true),
sys_getloadavg()[0]
));
```

## Consumption extension

There is an extension `ConsumerMonitoringExtension` for Enqueue [QueueConsumer](quick_tour.md#consumption).
It could collect consumed messages and consumer stats for you.

```php
<?php
use Enqueue\Consumption\QueueConsumer;
use Enqueue\Consumption\ChainExtension;
use Enqueue\Monitoring\ConsumerMonitoringExtension;
use Enqueue\Monitoring\GenericStatsStorageFactory;
use Interop\Queue\Context;

/** @var Context $context */

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');

$queueConsumer = new QueueConsumer($context, new ChainExtension([
new ConsumerMonitoringExtension($statsStorage)
]));

// bind

// consume
```

## Enqueue Client Extension

There is an extension ClientMonitoringExtension for Enqueue [Client](quick_tour.md#client) too. It could collect sent messages stats for you.

## InfluxDB Storage

Install additional packages:

```
composer req influxdb/influxdb-php:^1.14
```

```php
<?php
use Enqueue\Monitoring\GenericStatsStorageFactory;

$statsStorage = (new GenericStatsStorageFactory())->create('influxdb://127.0.0.1:8086?db=foo');
```

There are available options:

```
* 'host' => '127.0.0.1',
* 'port' => '8086',
* 'user' => '',
* 'password' => '',
* 'db' => 'enqueue',
* 'measurementSentMessages' => 'sent-messages',
* 'measurementConsumedMessages' => 'consumed-messages',
* 'measurementConsumers' => 'consumers',
```

## WAMP (Web Socket Messaging Protocol) Storage

Install additional packages:

```
composer req thruway/pawl-transport:^0.5.0 voryx/thruway:^0.5.3
```

```php
<?php
use Enqueue\Monitoring\GenericStatsStorageFactory;

$statsStorage = (new GenericStatsStorageFactory())->create('wamp://127.0.0.1:9090?topic=stats');
```

There are available options:

```
* 'host' => '127.0.0.1',
* 'port' => '9090',
* 'topic' => 'stats',
* 'max_retries' => 15,
* 'initial_retry_delay' => 1.5,
* 'max_retry_delay' => 300,
* 'retry_delay_growth' => 1.5,
```

## Symfony App

You have to register some services in order to incorporate monitoring facilities into your Symfony application.

```yaml
services:
Enqueue\Monitoring\GenericStatsStorageFactory: ~

Enqueue\Monitoring\StatsStorage:
factory: ['@Enqueue\Monitoring\GenericStatsStorageFactory', 'create']
arguments: ['influxdb://127.0.0.1:8086?db=foo']

Enqueue\Monitoring\ConsumerMonitoringExtension:
arguments:
- '@Enqueue\Monitoring\StatsStorage'
tags:
# if you want to monitor transport consumer
- { name: 'enqueue.transport.consumption_extension', transport: 'default' }

# if you want to monitor client consumer
- { name: 'enqueue.consumption_extension', client: 'default' }

# if you want to monitor sent messages
Enqueue\Monitoring\ClientMonitoringExtension:
arguments:
- '@Enqueue\Monitoring\StatsStorage'
- '@logger'
tags:
- { name: 'enqueue.client_extension', client: 'default' }
```

[back to index](index.md)
5 changes: 5 additions & 0 deletions docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made
* [Remote Procedure Call (RPC)](#remote-procedure-call-rpc)
* [Client](#client)
* [Cli commands](#cli-commands)
* [Monitoring](#monitoring)

## Transport

Expand Down Expand Up @@ -281,4 +282,8 @@ and starts the consumption from the console:
$ app.php consume
```

## Monitoring

There is a tool that can track sent\consumed messages as well as consumer performance. Read more [here](monitoring.md)

[back to index](index.md)
Loading