diff --git a/src/chia_log/handlers/condition_checkers/__init__.py b/src/chia_log/handlers/condition_checkers/__init__.py index 8d41173..476c740 100644 --- a/src/chia_log/handlers/condition_checkers/__init__.py +++ b/src/chia_log/handlers/condition_checkers/__init__.py @@ -6,6 +6,7 @@ from src.notifier import Event from ...parsers.finished_signage_point_parser import FinishedSignagePointMessage from ...parsers.harvester_activity_parser import HarvesterActivityMessage +from ...parsers.farmer_server_parser import FarmerServerMessage class FinishedSignageConditionChecker(ABC): @@ -18,3 +19,9 @@ class HarvesterConditionChecker(ABC): @abstractmethod def check(self, obj: HarvesterActivityMessage) -> Optional[Event]: pass + + +class FarmerConditionChecker(ABC): + @abstractmethod + def check(self, obj: FarmerServerMessage) -> Optional[Event]: + pass diff --git a/src/chia_log/handlers/condition_checkers/remote_harvester_activity.py b/src/chia_log/handlers/condition_checkers/remote_harvester_activity.py new file mode 100644 index 0000000..22941f0 --- /dev/null +++ b/src/chia_log/handlers/condition_checkers/remote_harvester_activity.py @@ -0,0 +1,63 @@ +# std +import logging +from typing import Optional +from dataclasses import dataclass +from datetime import datetime + +# project +from src.notifier import Event, EventService, EventType, EventPriority +from . import FarmerConditionChecker +from ...parsers.farmer_server_parser import FarmerServerMessage + + +@dataclass +class RemoteHarvester: + """Record of last activity""" + last_activity: datetime + peer_hash: str + ip_addr: str + +class RemoteHarvesterActivity(FarmerConditionChecker): + """The remote harvesters connected to the farmer_server + are not expected to disappear. Disappearing remote harvesters + indicate network or remote system issues. + """ + + def __init__(self): + logging.info("Enabled check for disappearing remote harvesters.") + self._warning_threshold = 300 + self._remote_harvesters = {} + + def check(self, obj: FarmerServerMessage) -> Optional[Event]: + #update last_activity + if self._remote_harvesters.get(obj.peer_hash, False): + self._remote_harvesters[obj.peer_hash].last_activity = obj.timestamp + self._remote_harvesters[obj.peer_hash].ip_addr = obj.ip_addr + else: + self._remote_harvesters[obj.peer_hash] = RemoteHarvester( + obj.timestamp, + obj.peer_hash, + obj.ip_addr + ) + logging.info(f"New remote harvester: {obj.ip_addr}") + + + event = None + for peer_hash in self._remote_harvesters: + remote_harvester = self._remote_harvesters[peer_hash] + seconds_since_last = (obj.timestamp - remote_harvester.last_activity).seconds + if seconds_since_last > self._warning_threshold: + message = ( + f"Remote harvester offline: {remote_harvester.ip_addr} " + f"did not participate for {seconds_since_last} seconds!" + ) + logging.warning(message) + event = Event( + type=EventType.USER, priority=EventPriority.HIGH, service=EventService.FARMER, message=message + ) + + # Remove offline harvester + self._remote_harvesters.pop(peer_hash) + break + + return event diff --git a/src/chia_log/handlers/farmer_server_handler.py b/src/chia_log/handlers/farmer_server_handler.py new file mode 100644 index 0000000..887e170 --- /dev/null +++ b/src/chia_log/handlers/farmer_server_handler.py @@ -0,0 +1,51 @@ +# std +import logging +from typing import List, Optional + +# project +from . import LogHandler +from ..parsers.farmer_server_parser import FarmerServerParser +from .condition_checkers import FarmerConditionChecker +from .condition_checkers.remote_harvester_activity import RemoteHarvesterActivity +from .daily_stats.stats_manager import StatsManager +from src.notifier import Event, EventService, EventType, EventPriority + + +class FarmerServerHandler(LogHandler): + """This handler parses all logs indicating farmer_server + activity and participation of remote harvester in challenges. + It holds a list of condition checkers that are evaluated for + each event to ensure that farming is going smoothly. + """ + + def __init__(self): + self._parser = FarmerServerParser() + self._cond_checkers: List[FarmerConditionChecker] = [ + RemoteHarvesterActivity(), + ] + + def handle(self, logs: str, stats_manager: Optional[StatsManager] = None) -> List[Event]: + """Process incoming logs, check all conditions + and return a list of notable events. + """ + + events = [] + activity_messages = self._parser.parse(logs) + + if len(activity_messages) > 0: + logging.debug(f"Parsed {len(activity_messages)} farmer_server messages") + events.append( + # EventService.HARVESTER because this keepalive is indicating a harvester activity + Event( + type=EventType.KEEPALIVE, priority=EventPriority.NORMAL, service=EventService.HARVESTER, message="" + ) + ) + + # Run messages through all condition checkers + for msg in activity_messages: + for checker in self._cond_checkers: + event = checker.check(msg) + if event: + events.append(event) + + return events diff --git a/src/chia_log/log_handler.py b/src/chia_log/log_handler.py index aa12512..91a89ed 100644 --- a/src/chia_log/log_handler.py +++ b/src/chia_log/log_handler.py @@ -6,6 +6,7 @@ from src.chia_log.handlers.harvester_activity_handler import HarvesterActivityHandler from src.chia_log.handlers.finished_signage_point_handler import FinishedSignagePointHandler from src.chia_log.handlers.wallet_added_coin_handler import WalletAddedCoinHandler +from src.chia_log.handlers.farmer_server_handler import FarmerServerHandler from src.chia_log.log_consumer import LogConsumerSubscriber, LogConsumer from src.notifier.notify_manager import NotifyManager @@ -29,7 +30,7 @@ def __init__( ): self._notify_manager = notify_manager self._stats_manager = stats_manager - self._handlers = [HarvesterActivityHandler(), FinishedSignagePointHandler(), WalletAddedCoinHandler()] + self._handlers = [HarvesterActivityHandler(), FinishedSignagePointHandler(), WalletAddedCoinHandler(), FarmerServerHandler()] log_consumer.subscribe(self) def consume_logs(self, logs: str): diff --git a/src/chia_log/parsers/farmer_server_parser.py b/src/chia_log/parsers/farmer_server_parser.py new file mode 100644 index 0000000..d1eceb1 --- /dev/null +++ b/src/chia_log/parsers/farmer_server_parser.py @@ -0,0 +1,53 @@ +# std +import re +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import List + +# lib +from dateutil import parser as dateutil_parser + + +@dataclass +class FarmerServerMessage: + """Parsed information from farmer_server logs""" + + timestamp: datetime + peer_hash: str + ip_addr: str + + +class FarmerServerParser: + """This class can parse info log messages from the chia farmer_server + + You need to have enabled "log_level: DEBUG" in your chia config.yaml + The chia config.yaml is usually under ~/.chia/mainnet/config/config.yaml + """ + + def __init__(self): + logging.info("Enabled parser for farmer_server activity - peer infos.") + self._regex = re.compile( + r"([0-9:.]*) farmer farmer_server\s*: DEBUG\s* <\- farming_info " + r"from peer ([0-9a-z.]*) ([0-9\.:a-f]*)" + ) + + def parse(self, logs: str) -> List[FarmerServerMessage]: + """Parses all farmer_server activity messages from a bunch of logs + + :param logs: String of logs - can be multi-line + :returns: A list of parsed messages - can be empty + """ + + parsed_messages = [] + matches = self._regex.findall(logs) + for match in matches: + parsed_messages.append( + FarmerServerMessage( + timestamp=dateutil_parser.parse(match[0]), + peer_hash=match[1], + ip_addr=match[2] + ) + ) + + return parsed_messages diff --git a/tests/chia_log/handlers/test_farmer_server_handler.py b/tests/chia_log/handlers/test_farmer_server_handler.py new file mode 100644 index 0000000..8ac9347 --- /dev/null +++ b/tests/chia_log/handlers/test_farmer_server_handler.py @@ -0,0 +1,51 @@ +# std +import unittest +from pathlib import Path + +# project +from src.chia_log.handlers import farmer_server_handler +from src.notifier import EventType, EventService, EventPriority + + +class TestFarmerServerHandler(unittest.TestCase): + def setUp(self) -> None: + self.handler = farmer_server_handler.FarmerServerHandler() + self.example_logs_path = Path(__file__).resolve().parents[1] / "logs/farmer_server" + + def testNominal(self): + with open(self.example_logs_path / "nominal.txt") as f: + logs = f.readlines() + + for log in logs: + events = self.handler.handle(log) + self.assertEqual(len(events), 1, "Only expecting 1 event for keep-alive") + self.assertEqual(events[0].type, EventType.KEEPALIVE, "Unexpected type") + self.assertEqual(events[0].priority, EventPriority.NORMAL, "Unexpected priority") + self.assertEqual(events[0].service, EventService.HARVESTER, "Unexpected service") + + def testDisconnectedHarvester(self): + with open(self.example_logs_path / "disappearing_harvester.txt") as f: + logs = f.readlines() + + expected_message = "Remote harvester offline: 255.255.255.255 did not participate for 385 seconds!" + expected_number_events = [ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1] + + checked = 0 + for log, number_events in zip(logs, expected_number_events): + events = self.handler.handle(log) + if len(events) > 0: + print(len(events)) + self.assertEqual(len(events), number_events, "Unexpected number of events") + self.assertEqual(events[0].type, EventType.KEEPALIVE, "Unexpected type") + self.assertEqual(events[0].priority, EventPriority.NORMAL, "Unexpected priority") + self.assertEqual(events[0].service, EventService.HARVESTER, "Unexpected service") + if len(events) == 2: + self.assertEqual(events[1].type, EventType.USER, "Unexpected type") + self.assertEqual(events[1].priority, EventPriority.HIGH, "Unexpected priority") + self.assertEqual(events[1].service, EventService.FARMER, "Unexpected service") + self.assertEqual(events[1].message, expected_message, "Unexpected message") + checked += 1 + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/chia_log/logs/farmer_server/disappearing_harvester.txt b/tests/chia_log/logs/farmer_server/disappearing_harvester.txt new file mode 100644 index 0000000..2b1d355 --- /dev/null +++ b/tests/chia_log/logs/farmer_server/disappearing_harvester.txt @@ -0,0 +1,16 @@ +2021-05-05T16:02:06.632 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:02:06.656 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 +2021-05-05T16:02:06.660 farmer farmer_server : DEBUG <- farming_info from peer fe1eb730 255.255.255.255 +2021-05-05T16:02:06.675 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 +2021-05-05T16:04:14.938 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:04:15.169 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 +2021-05-05T16:04:16.951 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 +2021-05-05T16:06:24.253 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:06:24.274 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 +2021-05-05T16:06:24.297 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 +2021-05-05T16:08:32.378 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:08:32.420 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 +2021-05-05T16:08:32.660 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 +2021-05-05T16:10:32.378 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:10:32.420 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 +2021-05-05T16:10:32.660 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 diff --git a/tests/chia_log/logs/farmer_server/nominal.txt b/tests/chia_log/logs/farmer_server/nominal.txt new file mode 100644 index 0000000..0242f72 --- /dev/null +++ b/tests/chia_log/logs/farmer_server/nominal.txt @@ -0,0 +1,8 @@ +2021-05-05T16:02:06.632 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:02:06.656 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 +2021-05-05T16:02:06.660 farmer farmer_server : DEBUG <- farming_info from peer fe1eb730 255.255.255.255 +2021-05-05T16:02:06.675 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 +2021-05-05T16:02:14.938 farmer farmer_server : DEBUG <- farming_info from peer 20388420 127.0.0.1 +2021-05-05T16:02:14.995 farmer farmer_server : DEBUG <- farming_info from peer fe1eb730 255.255.255.255 +2021-05-05T16:02:15.169 farmer farmer_server : DEBUG <- farming_info from peer ea61688b 192.168.178.255 +2021-05-05T16:02:16.951 farmer farmer_server : DEBUG <- farming_info from peer c3e3eb9a 100.100.100.100 diff --git a/tests/chia_log/parsers/test_farmer_server_parser.py b/tests/chia_log/parsers/test_farmer_server_parser.py new file mode 100644 index 0000000..9d06334 --- /dev/null +++ b/tests/chia_log/parsers/test_farmer_server_parser.py @@ -0,0 +1,49 @@ +# std +import unittest +from pathlib import Path + +# project +from src.chia_log.parsers import farmer_server_parser + + +class TestFarmerServerParser(unittest.TestCase): + def setUp(self) -> None: + self.parser = farmer_server_parser.FarmerServerParser() + self.example_logs_path = Path(__file__).resolve().parents[1] / "logs/farmer_server" + with open(self.example_logs_path / "nominal.txt") as f: + self.nominal_logs = f.read() + + def tearDown(self) -> None: + pass + + def testBasicParsing(self): + for logs in [self.nominal_logs]: + + # Check that important fields are correctly parsed + activity_messages = self.parser.parse(logs) + self.assertNotEqual(len(activity_messages), 0, "No log messages found") + + expected_peer_hashes = ["20388420", "ea61688b", "fe1eb730", "c3e3eb9a", + "20388420", "fe1eb730", "ea61688b", "c3e3eb9a"] + expected_ip_addresses = ["127.0.0.1", "192.168.178.255", "255.255.255.255", + "100.100.100.100", "127.0.0.1", "255.255.255.255", + "192.168.178.255", "100.100.100.100"] + + for msg, peer_hash, ip_addr in zip( + activity_messages, + expected_peer_hashes, + expected_ip_addresses, + ): + self.assertEqual(msg.peer_hash, peer_hash, "Peer hash don't match") + self.assertEqual(msg.ip_addr, ip_addr, "IP Addr don't match") + + # Check arithmetic with parsed timestamps works + prev_timestamp = activity_messages[0].timestamp + for msg in activity_messages[1:]: + seconds_since_last_activity = (msg.timestamp - prev_timestamp).seconds + self.assertLess(seconds_since_last_activity, 10, "Unexpected duration between farmer_server events") + prev_timestamp = msg.timestamp + + +if __name__ == "__main__": + unittest.main()