-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_acrmonitor.py
68 lines (62 loc) · 1.78 KB
/
test_acrmonitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from io import BytesIO
from unittest.mock import patch
from freezegun import freeze_time
from kafka.consumer.fetcher import ConsumerRecord
from urllib3.response import HTTPResponse
from acrmonitor import read_from_event, send_from_data
@patch("acrmonitor.send_from_data")
@patch("acrmonitor.Minio")
@patch("acrmonitor.ZabbixSender")
def test_read_from_event(sender, mc, send_from_data):
msg = ConsumerRecord(
topic="topic",
partition="partition",
offset=0,
timestamp=0,
timestamp_type="",
key="key",
value='{"source":"minio:s3..acrcloud.raw","type":"com.amazonaws.s3.s3:ObjectCreated:Put","data":{}}',
headers={},
checksum="",
serialized_key_size=0,
serialized_value_size=0,
serialized_header_size=0,
)
fp = BytesIO(b'[{"my":"data"}]')
mc.get_object.return_value = HTTPResponse(fp)
read_from_event(
msg,
mc,
sender,
zabbix_host="zabbix.example.org",
zabbix_key="key.example.org",
)
send_from_data.assert_called_with(
sender,
"zabbix.example.org",
"key.example.org",
{"my": "data"},
)
@patch("acrmonitor.ZabbixSender")
@patch("acrmonitor.ZabbixMetric")
@freeze_time("1970-01-01 16:20:00")
def test_send_from_data(metric, sender):
data = {
"metadata": {
"timestamp_utc": "1970-01-01 13:12:00",
"acrid": "1234567890",
}
}
send_from_data(
sender=sender,
zabbix_host="zabbix.example.com",
zabbix_key="key.example.com",
data=data,
)
metric.assert_called_with(
host="zabbix.example.com",
key="key.example.com",
value="1234567890",
clock=47520.0,
)
sender.send.assert_called_with([metric()])