forked from andiburger/growatt2mqtt
-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathmodbus_tls.py
56 lines (43 loc) · 2.03 KB
/
modbus_tls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import logging
from classes.protocol_settings import Registry_Type, protocol_settings
from pymodbus.client.sync import ModbusTlsClient
from .transport_base import transport_base
from configparser import SectionProxy
class modbus_udp(transport_base):
port : int = 502
host : str = ""
hostname : str = ""
''' optional for cert '''
certfile : str = ""
keyfile : str = ""
client : ModbusTlsClient
def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None):
#logger = logging.getLogger(__name__)
#logging.basicConfig(level=logging.DEBUG)
self.host = settings.get("host", "")
if not self.host:
raise ValueError("Host is not set")
self.port = settings.getint("port", self.port)
self.certfile = settings.get("certfile", "")
if not self.certfile:
raise ValueError("certfile is not set")
self.keyfile = settings.get("keyfile", "")
if not self.keyfile:
raise ValueError("keyfile is not set")
self.hostname = settings.get("hostname", self.host)
self.client = ModbusTlsClient(host=self.host,
hostname = self.hostname,
certfile = self.certfile,
keyfile = self.keyfile,
port=self.port,
timeout=7,
retries=3)
super().__init__(settings, protocolSettings=protocolSettings)
def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs):
if registry_type == Registry_Type.INPUT:
return self.client.read_input_registers(start, count, **kwargs)
elif registry_type == Registry_Type.HOLDING:
return self.client.read_holding_registers(start, count, **kwargs)
def connect(self):
self.connected = self.client.connect()
super().connect()