|
1 |
| -from collections import defaultdict |
2 | 1 | from logging import Logger
|
3 | 2 |
|
4 | 3 | import eventlet
|
5 |
| -from eventlet.queue import Queue, Empty |
6 |
| -from kombu import Connection |
7 |
| -from kombu.messaging import Producer |
8 |
| - |
9 |
| -from lib import models, config |
10 | 4 |
|
11 | 5 |
|
12 | 6 | class SubmitMonitor:
|
13 | 7 | def __init__(self, logger: Logger, interval: float = 10):
|
14 | 8 | self._logger = logger
|
15 |
| - self._q = Queue() |
16 | 9 |
|
17 | 10 | self._ok_submits = 0
|
18 | 11 | self._bad_submits = 0
|
19 |
| - self._connections = 0 |
| 12 | + self._requests = 0 |
20 | 13 |
|
21 | 14 | self._interval = interval
|
22 | 15 |
|
23 | 16 | self._was_ok = self._ok_submits
|
24 | 17 | self._was_bad = self._bad_submits
|
25 |
| - self._was_conn = self._connections |
26 |
| - |
27 |
| - self._labels = ['attacker_id', 'victim_id', 'task_id', 'submit_ok'] |
28 |
| - |
29 |
| - def add(self, ar: models.AttackResult) -> None: |
30 |
| - self._q.put_nowait(ar) |
31 |
| - if ar.submit_ok: |
32 |
| - self.inc_ok() |
33 |
| - else: |
34 |
| - self.inc_bad() |
| 18 | + self._was_requests = self._requests |
35 | 19 |
|
36 | 20 | def inc_ok(self) -> None:
|
37 | 21 | self._ok_submits += 1
|
38 | 22 |
|
39 | 23 | def inc_bad(self) -> None:
|
40 | 24 | self._bad_submits += 1
|
41 | 25 |
|
42 |
| - def inc_conns(self) -> None: |
43 |
| - self._connections += 1 |
| 26 | + def inc_requests(self) -> None: |
| 27 | + self._requests += 1 |
44 | 28 |
|
45 | 29 | def _process_statistics(self) -> None:
|
46 | 30 | new_ok, new_bad = self._ok_submits, self._bad_submits
|
47 |
| - new_conn = self._connections |
| 31 | + new_requests = self._requests |
48 | 32 | self._logger.info(
|
49 | 33 | f"OK: {new_ok - self._was_ok:>6}, "
|
50 | 34 | f"BAD: {new_bad - self._was_bad:>6}, "
|
51 |
| - f"CONN: {new_conn - self._was_conn:>6}, " |
| 35 | + f"REQ: {new_requests - self._was_requests:>6}, " |
52 | 36 | f"TOTOK: {new_ok:>6}, TOTBAD: {new_bad:>6}, "
|
53 |
| - f"TOTCONN: {new_conn:>6}" |
| 37 | + f"TOTREQ: {new_requests:>6}" |
54 | 38 | )
|
55 | 39 | self._was_ok = new_ok
|
56 | 40 | self._was_bad = new_bad
|
57 |
| - self._was_conn = new_conn |
58 |
| - |
59 |
| - def _process_attacks_queue(self) -> None: |
60 |
| - conn = Connection(config.get_broker_url()) |
61 |
| - with conn.channel() as channel: |
62 |
| - producer = Producer(channel) |
63 |
| - by_label = defaultdict(list) |
64 |
| - while not self._q.empty(): |
65 |
| - try: |
66 |
| - ar: models.AttackResult = self._q.get_nowait() |
67 |
| - except Empty: |
68 |
| - continue |
69 |
| - else: |
70 |
| - by_label[ar.get_label_key()].append(ar) |
71 |
| - |
72 |
| - for ar_list in by_label.values(): |
73 |
| - if not ar_list: |
74 |
| - continue |
75 |
| - |
76 |
| - monitor_message = { |
77 |
| - 'type': 'flag_submit', |
78 |
| - 'data': ar_list[0].get_label_values(), |
79 |
| - 'value': len(ar_list), |
80 |
| - } |
81 |
| - |
82 |
| - producer.publish( |
83 |
| - monitor_message, |
84 |
| - exchange='', |
85 |
| - routing_key='forcad-monitoring', |
86 |
| - ) |
| 41 | + self._was_requests = new_requests |
87 | 42 |
|
88 | 43 | def __call__(self) -> None:
|
89 | 44 | while True:
|
90 | 45 | try:
|
91 | 46 | self._process_statistics()
|
92 |
| - self._process_attacks_queue() |
93 | 47 | except Exception as e:
|
94 | 48 | self._logger.error("Error in monitoring: %s", str(e))
|
95 | 49 | eventlet.sleep(self._interval)
|
0 commit comments