-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
test_activemq.py
100 lines (79 loc) · 3.75 KB
/
test_activemq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import random
import stomp
import string
import sys
import unittest
from xpack_metricbeat import XPackTest, metricbeat
@metricbeat.parameterized_with_supported_versions
class ActiveMqTest(XPackTest):
COMPOSE_SERVICES = ['activemq']
def get_activemq_module_config(self, metricset):
return {
'name': 'activemq',
'metricsets': [metricset],
'period': '5s',
'hosts': self.get_hosts(),
'path': '/api/jolokia/?ignoreErrors=true&canonicalNaming=false',
'username': 'admin',
'password': 'admin'
}
def get_hosts(self):
return [self.compose_host(port='8161/tcp')]
def get_stomp_host_port(self):
host_port = self.compose_host(port='61613/tcp')
s = host_port.split(':')
return s[0], int(s[1])
def destination_metrics_collected(self, destination_type, destination_name):
if self.output_lines() == 0:
return False
output = self.read_output_json()
for evt in output:
if self.all_messages_enqueued(evt, destination_type, destination_name):
return True
return False
def verify_destination_metrics_collection(self, destination_type):
from stomp import Connection
self.render_config_template(modules=[self.get_activemq_module_config(destination_type)])
proc = self.start_beat(home=self.beat_path)
destination_name = ''.join(random.choice(string.ascii_lowercase) for i in range(10))
conn = Connection([self.get_stomp_host_port()])
conn.start()
conn.connect(wait=True)
conn.send('/{}/{}'.format(destination_type, destination_name), 'first message')
conn.send('/{}/{}'.format(destination_type, destination_name), 'second message')
self.wait_until(lambda: self.destination_metrics_collected(destination_type, destination_name))
proc.check_kill_and_wait()
self.assert_no_logged_warnings()
output = self.read_output_json()
passed = False
for evt in output:
if self.all_messages_enqueued(evt, destination_type, destination_name):
assert 0 < evt['activemq'][destination_type]['messages']['size']['avg']
self.assert_fields_are_documented(evt)
passed = True
conn.disconnect()
assert passed
def all_messages_enqueued(self, evt, destination_type, destination_name):
return destination_type in evt['activemq'] and destination_name == evt['activemq'][destination_type]['name'] \
and 2 == evt['activemq'][destination_type]['messages']['enqueue']['count']
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, 'integration test')
def test_broker_metrics_collected(self):
self.render_config_template(modules=[self.get_activemq_module_config('broker')])
proc = self.start_beat(home=self.beat_path)
self.wait_until(lambda: self.output_lines() > 0)
proc.check_kill_and_wait()
self.assert_no_logged_warnings()
output = self.read_output_json()
for evt in output:
assert 'name' in evt['activemq']['broker']
assert 'pct' in evt['activemq']['broker']['memory']['broker']
assert 'count' in evt['activemq']['broker']['producers']
assert 'count' in evt['activemq']['broker']['consumers']
self.assert_fields_are_documented(evt)
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, 'integration test')
def test_queue_metrics_collected(self):
self.verify_destination_metrics_collection('queue')
@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, 'integration test')
def test_topic_metrics_collected(self):
self.verify_destination_metrics_collection('topic')