@@ -23,13 +23,16 @@ class InfluxDbStorage implements StatsStorage
2323 */
2424 private $ measurMessages ;
2525
26+ /**
27+ * @var string
28+ */
29+ private $ measurConsumers ;
30+
2631 /**
2732 * @var Database
2833 */
2934 private $ database ;
3035
31- private $ serializer ;
32-
3336 /**
3437 * @param Client $client
3538 * @param string $dbName
@@ -38,14 +41,41 @@ public function __construct(Client $client, string $dbName)
3841 {
3942 $ this ->client = $ client ;
4043 $ this ->dbName = $ dbName ;
41- $ this ->measurMessages = 'msg ' ;
42-
43- $ this ->serializer = new JsonSerializer ();
44+ $ this ->measurMessages = 'messages ' ;
45+ $ this ->measurConsumers = 'consumers ' ;
4446 }
4547
4648 public function pushConsumerStats (ConsumerStats $ event ): void
4749 {
48- // echo $this->serializer->toString($event).PHP_EOL;
50+ $ points = [];
51+
52+ foreach ($ event ->getQueues () as $ queue ) {
53+ $ tags = [
54+ 'queue ' => $ queue ,
55+ 'consumerId ' => $ event ->getConsumerId (),
56+ ];
57+
58+ $ values = [
59+ 'startedAtMs ' => $ event ->getStartedAtMs (),
60+ 'started ' => $ event ->isStarted (),
61+ 'finished ' => $ event ->isFinished (),
62+ 'failed ' => $ event ->isFailed (),
63+ 'received ' => $ event ->getReceived (),
64+ 'acknowledged ' => $ event ->getAcknowledged (),
65+ 'rejected ' => $ event ->getRejected (),
66+ 'requeued ' => $ event ->getRequeued (),
67+ 'memoryUsage ' => $ event ->getMemoryUsage (),
68+ 'systemLoad ' => $ event ->getSystemLoad (),
69+ ];
70+
71+ if ($ event ->getFinishedAtMs ()) {
72+ $ values ['finishedAtMs ' ] = $ event ->getFinishedAtMs ();
73+ }
74+
75+ $ points [] = new Point ($ this ->measurConsumers , null , $ tags , $ values , $ event ->getTimestampMs ());
76+ }
77+
78+ $ this ->getDb ()->writePoints ($ points , Database::PRECISION_MILLISECONDS );
4979 }
5080
5181 public function pushMessageStats (MessageStats $ event ): void
0 commit comments