Skip to content

Commit d4d0f17

Browse files
committed
monitoring
1 parent 5b62fd1 commit d4d0f17

File tree

5 files changed

+218
-5
lines changed

5 files changed

+218
-5
lines changed

pkg/monitoring/InfluxDbStorage.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ public function __construct(Client $client, string $dbName)
4343
$this->serializer = new JsonSerializer();
4444
}
4545

46-
public function pushConsumerStats(ConsumerStats $event)
46+
public function pushConsumerStats(ConsumerStats $event): void
4747
{
4848
// echo $this->serializer->toString($event).PHP_EOL;
4949
}
5050

51-
public function pushMessageStats(MessageStats $event)
51+
public function pushMessageStats(MessageStats $event): void
5252
{
5353
$tags = [
5454
'queue' => $event->getQueue(),

pkg/monitoring/StatsStorage.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
interface StatsStorage
88
{
9-
public function pushConsumerStats(ConsumerStats $event);
9+
public function pushConsumerStats(ConsumerStats $event): void;
1010

11-
public function pushMessageStats(MessageStats $event);
11+
public function pushMessageStats(MessageStats $event): void;
1212
}

pkg/monitoring/WampStorage.php

+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Monitoring;
6+
7+
use Enqueue\Dsn\Dsn;
8+
use Thruway\ClientSession;
9+
use Thruway\Peer\Client;
10+
use Thruway\Transport\PawlTransportProvider;
11+
12+
class WampStorage implements StatsStorage
13+
{
14+
/**
15+
* @var array
16+
*/
17+
private $config;
18+
19+
/**
20+
* @var Client
21+
*/
22+
private $client;
23+
24+
/**
25+
* @var Serializer
26+
*/
27+
private $serialiser;
28+
29+
/**
30+
* @var ClientSession
31+
*/
32+
private $session;
33+
34+
/**
35+
* @var Stats
36+
*/
37+
private $stats;
38+
39+
/**
40+
* The config could be an array, string DSN or null. In case of null it will attempt to connect to Thruway localhost.
41+
*
42+
* $config = [
43+
* 'dsn' => 'wamp://127.0.0.1:9090',
44+
* 'host' => '127.0.0.1',
45+
* 'port' => '9090',
46+
* 'topic' => 'stats',
47+
* 'max_retries' => 15,
48+
* 'initial_retry_delay' => 1.5,
49+
* 'max_retry_delay' => 300,
50+
* 'retry_delay_growth' => 1.5,
51+
* ]
52+
*
53+
* or
54+
*
55+
* wamp://127.0.0.1:9090?max_retries=10
56+
*
57+
* @param array|string|null $config
58+
*/
59+
public function __construct($config = 'wamp:')
60+
{
61+
if (empty($config)) {
62+
$config = $this->parseDsn('wamp:');
63+
} elseif (is_string($config)) {
64+
$config = $this->parseDsn($config);
65+
} elseif (is_array($config)) {
66+
$config = empty($config['dsn']) ? $config : $this->parseDsn($config['dsn']);
67+
} else {
68+
throw new \LogicException('The config must be either an array of options, a DSN string or null');
69+
}
70+
71+
$config = array_replace([
72+
'host' => '127.0.0.1',
73+
'port' => '9090',
74+
'topic' => 'stats',
75+
'max_retries' => 15,
76+
'initial_retry_delay' => 1.5,
77+
'max_retry_delay' => 300,
78+
'retry_delay_growth' => 1.5,
79+
], $config);
80+
81+
$this->config = $config;
82+
83+
$this->serialiser = new JsonSerializer();
84+
}
85+
86+
public function pushConsumerStats(ConsumerStats $event): void
87+
{
88+
$this->push($event);
89+
}
90+
91+
public function pushMessageStats(MessageStats $event): void
92+
{
93+
$this->push($event);
94+
}
95+
96+
private function push(Stats $stats)
97+
{
98+
$init = false;
99+
$this->stats = $stats;
100+
101+
if (null === $this->client) {
102+
$init = true;
103+
104+
$this->client = $this->createClient();
105+
$this->client->setAttemptRetry(true);
106+
$this->client->on('open', function (ClientSession $session) {
107+
$this->session = $session;
108+
109+
$this->doSendMessageIfPossible();
110+
});
111+
112+
$this->client->on('close', function () {
113+
if ($this->session === $this->client->getSession()) {
114+
$this->session = null;
115+
}
116+
});
117+
118+
$this->client->on('error', function () {
119+
if ($this->session === $this->client->getSession()) {
120+
$this->session = null;
121+
}
122+
});
123+
124+
$this->client->on('do-send', function (Stats $stats) {
125+
$onFinish = function () {
126+
$this->client->emit('do-stop');
127+
};
128+
129+
$payload = $this->serialiser->toString($stats);
130+
131+
$this->session->publish('stats', [$payload], [], ['acknowledge' => true])
132+
->then($onFinish, $onFinish);
133+
});
134+
135+
$this->client->on('do-stop', function () {
136+
$this->client->getLoop()->stop();
137+
});
138+
}
139+
140+
$this->client->getLoop()->futureTick(function () {
141+
$this->doSendMessageIfPossible();
142+
});
143+
144+
if ($init) {
145+
$this->client->start(false);
146+
}
147+
148+
$this->client->getLoop()->run();
149+
}
150+
151+
private function doSendMessageIfPossible()
152+
{
153+
if (null === $this->session) {
154+
return;
155+
}
156+
157+
if (null === $this->stats) {
158+
return;
159+
}
160+
161+
$stats = $this->stats;
162+
163+
$this->stats = null;
164+
165+
$this->client->emit('do-send', [$stats]);
166+
}
167+
168+
private function createClient(): Client
169+
{
170+
$uri = sprintf('ws://%s:%s', $this->config['host'], $this->config['port']);
171+
172+
$client = new Client('realm1');
173+
$client->addTransportProvider(new PawlTransportProvider($uri));
174+
$client->setReconnectOptions([
175+
'max_retries' => $this->config['max_retries'],
176+
'initial_retry_delay' => $this->config['initial_retry_delay'],
177+
'max_retry_delay' => $this->config['max_retry_delay'],
178+
'retry_delay_growth' => $this->config['retry_delay_growth'],
179+
]);
180+
181+
return $client;
182+
}
183+
184+
private function parseDsn(string $dsn): array
185+
{
186+
$dsn = new Dsn($dsn);
187+
188+
if (false === in_array($dsn->getSchemeProtocol(), ['wamp', 'ws'], true)) {
189+
throw new \LogicException(sprintf(
190+
'The given scheme protocol "%s" is not supported. It must be "wamp"',
191+
$dsn->getSchemeProtocol()
192+
));
193+
}
194+
195+
return array_filter(array_replace($dsn->getQuery(), [
196+
'host' => $dsn->getHost(),
197+
'port' => $dsn->getPort(),
198+
'topic' => $dsn->getQueryParameter('topic'),
199+
'max_retries' => $dsn->getInt('max_retries'),
200+
'initial_retry_delay' => $dsn->getFloat('initial_retry_delay'),
201+
'max_retry_delay' => $dsn->getInt('max_retry_delay'),
202+
'retry_delay_growth' => $dsn->getFloat('retry_delay_growth'),
203+
]), function ($value) { return null !== $value; });
204+
}
205+
}

pkg/monitoring/composer.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
"require": {
99
"php": "^7.1.3",
1010
"enqueue/enqueue": "0.9.x-dev",
11-
"ramsey/uuid": "^3"
11+
"ramsey/uuid": "^3",
12+
"enqueue/dsn": "0.9.x-dev"
1213
},
1314
"require-dev": {
1415
"phpunit/phpunit": "~5.4.0",

wamp-consumer.php

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
/**
3+
* Created by FormaPro for Alea.
4+
* Date: 2/11/18
5+
* Time: 10:35
6+
*/
7+

0 commit comments

Comments
 (0)