-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrouter_ping.py
177 lines (148 loc) · 6.65 KB
/
router_ping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python3
import configparser
import time
import logging
import subprocess
import requests
import sys
import os
from datetime import datetime
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/router_ping.log')
]
)
logger = logging.getLogger(__name__)
class RouterPing:
def __init__(self, config_path):
"""Initialize with the path to the configuration file."""
self.config_path = config_path
self.config = None
self.failure_count = 0
self.load_config()
def load_config(self):
"""Load configuration from the INI file."""
try:
self.config = configparser.ConfigParser()
self.config.read(self.config_path)
# Validate required configuration
required_keys = {
'General': ['ping_address', 'ping_interval', 'max_failures', 'ping_timeout', 'restart_delay'],
'Tasmota': ['plug_ip', 'relay']
}
for section, keys in required_keys.items():
if section not in self.config:
raise ValueError(f"Missing section: {section}")
for key in keys:
if key not in self.config[section]:
raise ValueError(f"Missing key: {key} in section {section}")
logger.info(f"Configuration loaded from {self.config_path}")
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
sys.exit(1)
def ping(self, address, timeout):
"""Ping the specified address and return True if successful."""
try:
# Using subprocess to call the system ping command
# -c 1: send only 1 packet
# -W timeout: timeout in seconds
result = subprocess.run(
["ping", "-c", "1", "-W", str(timeout), address],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False
)
return result.returncode == 0
except Exception as e:
logger.error(f"Error during ping: {e}")
return False
def toggle_tasmota(self):
"""
Check the current state of the Tasmota plug.
If it's ON, turn it OFF, wait for restart_delay seconds, then turn it back ON.
If it's already OFF, do nothing.
"""
try:
plug_ip = self.config['Tasmota']['plug_ip']
relay = self.config['Tasmota']['relay']
username = self.config['Tasmota'].get('username', '')
password = self.config['Tasmota'].get('password', '')
restart_delay = int(self.config['General'].get('restart_delay', 30))
auth = None
if username and password:
auth = (username, password)
# First, get the current state
status_url = f"http://{plug_ip}/cm?cmnd=Power{relay}"
response = requests.get(status_url, auth=auth, timeout=5)
response.raise_for_status()
# Parse the response to determine the current state
current_state = response.json().get(f"POWER{relay}", "")
# Only proceed if the plug is currently ON
if current_state == "ON":
# Turn it OFF
off_url = f"http://{plug_ip}/cm?cmnd=Power{relay}%20OFF"
response = requests.get(off_url, auth=auth, timeout=5)
response.raise_for_status()
logger.info(f"Turned Tasmota plug OFF")
# Wait for the specified delay
logger.info(f"Waiting {restart_delay} seconds before turning plug back ON")
time.sleep(restart_delay)
# Turn it back ON
on_url = f"http://{plug_ip}/cm?cmnd=Power{relay}%20ON"
response = requests.get(on_url, auth=auth, timeout=5)
response.raise_for_status()
logger.info(f"Turned Tasmota plug back ON")
return True
else:
logger.info(f"Tasmota plug is already OFF, no action taken")
return False
except Exception as e:
logger.error(f"Failed to toggle Tasmota plug: {e}")
return False
def run(self):
"""Main loop to ping and toggle if needed."""
logger.info("Starting RouterPing service")
while True:
try:
ping_address = self.config['General']['ping_address']
ping_interval = int(self.config['General']['ping_interval'])
max_failures = int(self.config['General']['max_failures'])
ping_timeout = int(self.config['General']['ping_timeout'])
# Perform the ping
ping_success = self.ping(ping_address, ping_timeout)
if ping_success:
if self.failure_count > 0:
logger.info(f"Ping successful to {ping_address}. Resetting failure count.")
self.failure_count = 0
else:
self.failure_count += 1
logger.warning(f"Ping failed to {ping_address}. Failure count: {self.failure_count}/{max_failures}")
if self.failure_count >= max_failures:
logger.warning(f"Maximum failures reached ({max_failures}). Toggling Tasmota plug.")
if self.toggle_tasmota():
# Reset failure count after successful toggle
self.failure_count = 0
# Sleep until the next interval
time.sleep(ping_interval)
# Reload config periodically to pick up changes
self.load_config()
except Exception as e:
logger.error(f"Error in main loop: {e}")
time.sleep(60) # Sleep for a minute on error
if __name__ == "__main__":
# Default config path
config_path = "/etc/router_ping.ini"
# Allow overriding config path from command line
if len(sys.argv) > 1:
config_path = sys.argv[1]
# Check if config file exists
if not os.path.exists(config_path):
logger.error(f"Configuration file not found: {config_path}")
sys.exit(1)
router_ping = RouterPing(config_path)
router_ping.run()