Skip to content

[monitoring] Add support of Datadog #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
"predis/predis": "^1.1",
"thruway/pawl-transport": "^0.5.0",
"voryx/thruway": "^0.5.3",
"influxdb/influxdb-php": "^1.14"
"influxdb/influxdb-php": "^1.14",
"datadog/php-datadogstatsd": "^1.3"
},
"require-dev": {
"phpunit/phpunit": "^5.5",
Expand Down
Binary file added docs/images/datadog_monitoring.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 51 additions & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ With it, you can control how many messages were sent, how many processed success
How many consumers are working, their up time, processed messages stats, memory usage and system load.
The tool could be integrated with virtually any analytics and monitoring platform.
There are several integration:
* [Datadog StatsD](https://datadoghq.com)
* [InfluxDB](https://www.influxdata.com/) and [Grafana](https://grafana.com/)
* [WAMP (Web Application Messaging Protocol)](https://wamp-proto.org/)

We are working on a JS\WAMP based real-time UI tool, for more information please [contact us](opensource@forma-pro.com).

![Grafana Monitoring](images/grafana_monitoring.jpg)
Expand All @@ -30,6 +30,7 @@ We are working on a JS\WAMP based real-time UI tool, for more information please
* [Consumption extension](#consumption-extension)
* [Enqueue Client Extension](#enqueue-client-extension)
* [InfluxDB Storage](#influxdb-storage)
* [Datadog Storage](#datadog-storage)
* [WAMP (Web Socket Messaging Protocol) Storage](#wamp-(web-socket-messaging-protocol)-storage)
* [Symfony App](#symfony-app)

Expand Down Expand Up @@ -237,6 +238,50 @@ There are available options:
* 'measurementConsumers' => 'consumers',
```

## Datadog storage

Install additional packages:

```
composer req datadog/php-datadogstatsd:^1.3
```

```php
<?php
use Enqueue\Monitoring\GenericStatsStorageFactory;

$statsStorage = (new GenericStatsStorageFactory())->create('datadog://127.0.0.1:8125');
```

For best experience please adjust units and types in metric summary.

Example dashboard:

![Datadog monitoring](images/datadog_monitoring.png)


There are available options (and all available metrics):

```
* 'host' => '127.0.0.1',
* 'port' => '8125',
* 'batched' => true, // performance boost
* 'global_tags' => '', // should contain keys and values
* 'metric.messages.sent' => 'enqueue.messages.sent',
* 'metric.messages.consumed' => 'enqueue.messages.consumed',
* 'metric.messages.redelivered' => 'enqueue.messages.redelivered',
* 'metric.messages.failed' => 'enqueue.messages.failed',
* 'metric.consumers.started' => 'enqueue.consumers.started',
* 'metric.consumers.finished' => 'enqueue.consumers.finished',
* 'metric.consumers.failed' => 'enqueue.consumers.failed',
* 'metric.consumers.received' => 'enqueue.consumers.received',
* 'metric.consumers.acknowledged' => 'enqueue.consumers.acknowledged',
* 'metric.consumers.rejected' => 'enqueue.consumers.rejected',
* 'metric.consumers.requeued' => 'enqueue.consumers.requeued',
* 'metric.consumers.memoryUsage' => 'enqueue.consumers.memoryUsage',
```


## WAMP (Web Socket Messaging Protocol) Storage

Install additional packages:
Expand Down Expand Up @@ -280,6 +325,11 @@ enqueue:
transport: 'amqp://guest:guest@foo:5672/%2f'
monitoring: 'wamp://127.0.0.1:9090?topic=stats'
client: ~

datadog:
transport: 'amqp://guest:guest@foo:5672/%2f'
monitoring: 'datadog://127.0.0.1:8125?batched=false'
client: ~
```

[back to index](index.md)
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
<testsuite name="wamp transport">
<directory>pkg/wamp/Tests</directory>
</testsuite>

<testsuite name="monitoring">
<directory>pkg/monitoring/Tests</directory>
</testsuite>
</testsuites>

<php>
Expand Down
173 changes: 173 additions & 0 deletions pkg/monitoring/DatadogStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
<?php

declare(strict_types=1);

namespace Enqueue\Monitoring;

use DataDog\BatchedDogStatsd;
use DataDog\DogStatsd;
use Enqueue\Client\Config;
use Enqueue\Dsn\Dsn;

class DatadogStorage implements StatsStorage
{
/**
* @var array
*/
private $config;

/**
* @var BatchedDogStatsd
*/
private $datadog;

public function __construct($config = 'datadog:')
{
if (false === class_exists(DogStatsd::class)) {
throw new \LogicException('Seems client library is not installed. Please install "datadog/php-datadogstatsd"');
}

$this->config = $this->prepareConfig($config);

if (null === $this->datadog) {
if (true === filter_var($this->config['batched'], FILTER_VALIDATE_BOOLEAN)) {
$this->datadog = new BatchedDogStatsd($this->config);
} else {
$this->datadog = new DogStatsd($this->config);
}
}
}

public function pushConsumerStats(ConsumerStats $stats): void
{
$queues = $stats->getQueues();
array_walk($queues, function (string $queue) use ($stats) {
$tags = [
'queue' => $queue,
'consumerId' => $stats->getConsumerId(),
];

if ($stats->getFinishedAtMs()) {
$values['finishedAtMs'] = $stats->getFinishedAtMs();
}

$this->datadog->gauge($this->config['metric.consumers.started'], (int) $stats->isStarted(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.finished'], (int) $stats->isFinished(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.failed'], (int) $stats->isFailed(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.received'], $stats->getReceived(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.acknowledged'], $stats->getAcknowledged(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.rejected'], $stats->getRejected(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.requeued'], $stats->getRejected(), 1, $tags);
$this->datadog->gauge($this->config['metric.consumers.memoryUsage'], $stats->getMemoryUsage(), 1, $tags);
});
}

public function pushSentMessageStats(SentMessageStats $stats): void
{
$tags = [
'destination' => $stats->getDestination(),
];

$properties = $stats->getProperties();
if (false === empty($properties[Config::TOPIC])) {
$tags['topic'] = $properties[Config::TOPIC];
}

if (false === empty($properties[Config::COMMAND])) {
$tags['command'] = $properties[Config::COMMAND];
}

$this->datadog->increment($this->config['metric.messages.sent'], 1, $tags);
}

public function pushConsumedMessageStats(ConsumedMessageStats $stats): void
{
$tags = [
'queue' => $stats->getQueue(),
'status' => $stats->getStatus(),
];

if (ConsumedMessageStats::STATUS_FAILED === $stats->getStatus()) {
$this->datadog->increment($this->config['metric.messages.failed'], 1, $tags);
}

if ($stats->isRedelivered()) {
$this->datadog->increment($this->config['metric.messages.redelivered'], 1, $tags);
}

$runtime = $stats->getTimestampMs() - $stats->getReceivedAtMs();
$this->datadog->histogram($this->config['metric.messages.consumed'], $runtime, 1, $tags);
}

private function parseDsn(string $dsn): array
{
$dsn = Dsn::parseFirst($dsn);

if ('datadog' !== $dsn->getSchemeProtocol()) {
throw new \LogicException(sprintf(
'The given scheme protocol "%s" is not supported. It must be "datadog"',
$dsn->getSchemeProtocol()
));
}

return array_filter(array_replace($dsn->getQuery(), [
'host' => $dsn->getHost(),
'port' => $dsn->getPort(),
'global_tags' => $dsn->getString('global_tags'),
'batched' => $dsn->getString('batched'),
'metric.messages.sent' => $dsn->getString('metric.messages.sent'),
'metric.messages.consumed' => $dsn->getString('metric.messages.consumed'),
'metric.messages.redelivered' => $dsn->getString('metric.messages.redelivered'),
'metric.messages.failed' => $dsn->getString('metric.messages.failed'),
'metric.consumers.started' => $dsn->getString('metric.consumers.started'),
'metric.consumers.finished' => $dsn->getString('metric.consumers.finished'),
'metric.consumers.failed' => $dsn->getString('metric.consumers.failed'),
'metric.consumers.received' => $dsn->getString('metric.consumers.received'),
'metric.consumers.acknowledged' => $dsn->getString('metric.consumers.acknowledged'),
'metric.consumers.rejected' => $dsn->getString('metric.consumers.rejected'),
'metric.consumers.requeued' => $dsn->getString('metric.consumers.requeued'),
'metric.consumers.memoryUsage' => $dsn->getString('metric.consumers.memoryUsage'),
]), function ($value) {
return null !== $value;
});
}

/**
* @param $config
*
* @return array
*/
private function prepareConfig($config): array
{
if (empty($config)) {
$config = $this->parseDsn('datadog:');
} elseif (\is_string($config)) {
$config = $this->parseDsn($config);
} elseif (\is_array($config)) {
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
} elseif ($config instanceof DogStatsd) {
$this->datadog = $config;
$config = [];
} else {
throw new \LogicException('The config must be either an array of options, a DSN string or null');
}

return array_replace([
'host' => 'localhost',
'port' => 8125,
'batched' => true,
'metric.messages.sent' => 'enqueue.messages.sent',
'metric.messages.consumed' => 'enqueue.messages.consumed',
'metric.messages.redelivered' => 'enqueue.messages.redelivered',
'metric.messages.failed' => 'enqueue.messages.failed',
'metric.consumers.started' => 'enqueue.consumers.started',
'metric.consumers.finished' => 'enqueue.consumers.finished',
'metric.consumers.failed' => 'enqueue.consumers.failed',
'metric.consumers.received' => 'enqueue.consumers.received',
'metric.consumers.acknowledged' => 'enqueue.consumers.acknowledged',
'metric.consumers.rejected' => 'enqueue.consumers.rejected',
'metric.consumers.requeued' => 'enqueue.consumers.requeued',
'metric.consumers.memoryUsage' => 'enqueue.consumers.memoryUsage',
], $config);
}
}
12 changes: 6 additions & 6 deletions pkg/monitoring/GenericStatsStorageFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ class GenericStatsStorageFactory implements StatsStorageFactory
{
public function create($config): StatsStorage
{
if (is_string($config)) {
if (\is_string($config)) {
$config = ['dsn' => $config];
}

if (false == is_array($config)) {
if (false === \is_array($config)) {
throw new \InvalidArgumentException('The config must be either array or DSN string.');
}

if (false == array_key_exists('dsn', $config)) {
if (false === array_key_exists('dsn', $config)) {
throw new \InvalidArgumentException('The config must have dsn key set.');
}

$dsn = Dsn::parseFirst($config['dsn']);

if ($storageClass = $this->findStorageClass($dsn, Resources::getKnownStorages())) {
return new $storageClass(1 === count($config) ? $config['dsn'] : $config);
return new $storageClass(1 === \count($config) ? $config['dsn'] : $config);
}

throw new \LogicException(sprintf('A given scheme "%s" is not supported.', $dsn->getScheme()));
Expand All @@ -41,7 +41,7 @@ private function findStorageClass(Dsn $dsn, array $factories): ?string
continue;
}

if (false == in_array($protocol, $info['schemes'], true)) {
if (false === \in_array($protocol, $info['schemes'], true)) {
continue;
}

Expand All @@ -53,7 +53,7 @@ private function findStorageClass(Dsn $dsn, array $factories): ?string
}

foreach ($factories as $storageClass => $info) {
if (false == in_array($protocol, $info['schemes'], true)) {
if (false === \in_array($protocol, $info['schemes'], true)) {
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/monitoring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Queue Monitoring tool. Track sent, consumed messages. Consumers performances.
* Could be used with any message queue library.
* Could be intergrated to any PHP framework
* Could send stats to any analytical platform
* Supports Grafana and WAMP out of the box.
* Supports Datadog, InfluxDb, Grafana and WAMP out of the box.
* Provides integration for Enqueue

[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)
Expand Down
5 changes: 5 additions & 0 deletions pkg/monitoring/Resources.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public static function getKnownStorages(): array
'supportedSchemeExtensions' => [],
];

$map[DatadogStorage::class] = [
'schemes' => ['datadog'],
'supportedSchemeExtensions' => [],
];

self::$knownStorages = $map;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public static function getConfiguration(string $name = 'monitoring'): ArrayNodeD
->info(sprintf('The "%s" option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at stats storage constructor doc block.', $name))
->beforeNormalization()
->always(function ($v) {
if (is_array($v)) {
if (isset($v['storage_factory_class']) && isset($v['storage_factory_service'])) {
if (\is_array($v)) {
if (isset($v['storage_factory_class'], $v['storage_factory_service'])) {
throw new \LogicException('Both options storage_factory_class and storage_factory_service are set. Please choose one.');
}

Expand Down
Loading