Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial working commands #1

Merged
merged 12 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ insert_final_newline = false

[*.yml]
indent_size = 2

[*.yaml]
indent_size = 2
7 changes: 5 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ jobs:
- name: 'Bootstrap'
# The first CLI call will create the .venv
run: |
./dev-cli.py version
./dev-cli.py create-default-settings
./cli.py version

- name: 'CLI help'
run: |
./dev-cli.py --help
./cli.py --help

- name: 'Safety'
run: |
./cli.py safety
./dev-cli.py safety

- name: 'Run tests with Python v${{ matrix.python-version }}'
run: |
./cli.py coverage
./dev-cli.py coverage

- name: 'Upload coverage report'
uses: codecov/codecov-action@v3
Expand Down
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,74 @@ Get values from modbus energy meter to MQTT / HomeAssistant


Energy Meter -> modbus -> RS485-USB-Adapter -> energymeter2mqtt -> MQTT -> Home Assistant


The current focus is on the energy meter "Saia PCD ALD1D5FD"
However, the code is kept flexible, so that similar meters can be quickly put into operation.



# start development

```bash
~$ git clone https://github.com/jedie/energymeter2mqtt.git
~$ cd inverter-connect
~/energymeter2mqtt$ ./dev-cli.py --help
```


# dev CLI

[comment]: <> (✂✂✂ auto generated dev help start ✂✂✂)
```
Usage: ./dev-cli.py [OPTIONS] COMMAND [ARGS]...

╭─ Options ────────────────────────────────────────────────────────────────────────────────────────╮
│ --help Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ───────────────────────────────────────────────────────────────────────────────────────╮
│ check-code-style Check code style by calling darker + flake8 │
│ coverage Run and show coverage. │
│ create-default-settings Create a default user settings file. (Used by CI pipeline ;) │
│ fix-code-style Fix code style of all inverter source code files via darker │
│ install Run pip-sync and install 'inverter' via pip as editable. │
│ mypy Run Mypy (configured in pyproject.toml) │
│ publish Build and upload this project to PyPi │
│ safety Run safety check against current requirements files │
│ test Run unittests │
│ tox Run tox │
│ update Update "requirements*.txt" dependencies files │
│ update-test-snapshot-files Update all test snapshot files (by remove and recreate all snapshot │
│ files) │
│ version Print version and exit │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
```
[comment]: <> (✂✂✂ auto generated dev help end ✂✂✂)


# app CLI

[comment]: <> (✂✂✂ auto generated main help start ✂✂✂)
```
Usage: ./cli.py [OPTIONS] COMMAND [ARGS]...

╭─ Options ────────────────────────────────────────────────────────────────────────────────────────╮
│ --help Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ───────────────────────────────────────────────────────────────────────────────────────╮
│ debug-settings Display (anonymized) MQTT server username and password │
│ edit-settings Edit the settings file. On first call: Create the default one. │
│ print-registers Print RAW modbus register data │
│ print-values Print all values from the definition in endless loop │
│ publish-loop Publish all values via MQTT to Home Assistant in a endless loop. │
│ systemd-debug Print Systemd service template + context + rendered file content. │
│ systemd-remove Write Systemd service file, enable it and (re-)start the service. (May need │
│ sudo) │
│ systemd-setup Write Systemd service file, enable it and (re-)start the service. (May need │
│ sudo) │
│ systemd-status Display status of systemd service. (May need sudo) │
│ systemd-stop Stops the systemd service. (May need sudo) │
│ version Print version and exit │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
```
[comment]: <> (✂✂✂ auto generated main help end ✂✂✂)
6 changes: 3 additions & 3 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ def print_no_pip_error():
FILE_EXT = ''

BASE_PATH = Path(__file__).parent
VENV_PATH = BASE_PATH / '.venv'
VENV_PATH = BASE_PATH / '.venv-app'
BIN_PATH = VENV_PATH / BIN_NAME
PYTHON_PATH = BIN_PATH / f'python{FILE_EXT}'
PIP_PATH = BIN_PATH / f'pip{FILE_EXT}'
PIP_SYNC_PATH = BIN_PATH / f'pip-sync{FILE_EXT}'

DEP_LOCK_PATH = BASE_PATH / 'requirements.dev.txt'
DEP_LOCK_PATH = BASE_PATH / 'requirements.txt'
DEP_HASH_PATH = VENV_PATH / '.dep_hash'

# script file defined in pyproject.toml as [console_scripts]
# (Under Windows: ".exe" not added!)
PROJECT_SHELL_SCRIPT = BIN_PATH / 'energymeter2mqtt'
PROJECT_SHELL_SCRIPT = BIN_PATH / 'energymeter2mqtt_app'


def get_dep_hash():
Expand Down
115 changes: 115 additions & 0 deletions dev-cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python3

"""
bootstrap CLI
~~~~~~~~~~~~~

Just call this file, and the magic happens ;)
"""

import hashlib
import subprocess
import sys
import venv
from pathlib import Path


def print_no_pip_error():
print('Error: Pip not available!')
print('Hint: "apt-get install python3-venv"\n')


try:
from ensurepip import version
except ModuleNotFoundError as err:
print(err)
print('-' * 100)
print_no_pip_error()
raise
else:
if not version():
print_no_pip_error()
sys.exit(-1)


assert sys.version_info >= (3, 9), 'Python version is too old!'


if sys.platform == 'win32': # wtf
# Files under Windows, e.g.: .../.venv/Scripts/python.exe
BIN_NAME = 'Scripts'
FILE_EXT = '.exe'
else:
# Files under Linux/Mac and all other than Windows, e.g.: .../.venv/bin/python
BIN_NAME = 'bin'
FILE_EXT = ''

BASE_PATH = Path(__file__).parent
VENV_PATH = BASE_PATH / '.venv'
BIN_PATH = VENV_PATH / BIN_NAME
PYTHON_PATH = BIN_PATH / f'python{FILE_EXT}'
PIP_PATH = BIN_PATH / f'pip{FILE_EXT}'
PIP_SYNC_PATH = BIN_PATH / f'pip-sync{FILE_EXT}'

DEP_LOCK_PATH = BASE_PATH / 'requirements.dev.txt'
DEP_HASH_PATH = VENV_PATH / '.dep_hash'

# script file defined in pyproject.toml as [console_scripts]
# (Under Windows: ".exe" not added!)
PROJECT_SHELL_SCRIPT = BIN_PATH / 'energymeter2mqtt_dev'


def get_dep_hash():
"""Get SHA512 hash from poetry.lock content."""
return hashlib.sha512(DEP_LOCK_PATH.read_bytes()).hexdigest()


def store_dep_hash():
"""Generate .venv/.dep_hash"""
DEP_HASH_PATH.write_text(get_dep_hash())


def venv_up2date():
"""Is existing .venv is up-to-date?"""
if DEP_HASH_PATH.is_file():
return DEP_HASH_PATH.read_text() == get_dep_hash()
return False


def verbose_check_call(*popen_args):
print(f'\n+ {" ".join(str(arg) for arg in popen_args)}\n')
return subprocess.check_call(popen_args)


def main(argv):
assert DEP_LOCK_PATH.is_file(), f'File not found: "{DEP_LOCK_PATH}" !'

# Create virtual env in ".venv/":
if not PYTHON_PATH.is_file():
print('Create virtual env here:', VENV_PATH.absolute())
builder = venv.EnvBuilder(symlinks=True, upgrade=True, with_pip=True)
builder.create(env_dir=VENV_PATH)
# Update pip
verbose_check_call(PYTHON_PATH, '-m', 'pip', 'install', '-U', 'pip')

if not PIP_SYNC_PATH.is_file():
# Install pip-tools
verbose_check_call(PYTHON_PATH, '-m', 'pip', 'install', '-U', 'pip-tools')

if not PROJECT_SHELL_SCRIPT.is_file() or not venv_up2date():
# install requirements via "pip-sync"
verbose_check_call(PIP_SYNC_PATH, str(DEP_LOCK_PATH))

# install project
verbose_check_call(PIP_PATH, 'install', '--no-deps', '-e', '.')
store_dep_hash()

# Call our entry point CLI:
try:
verbose_check_call(PROJECT_SHELL_SCRIPT, *sys.argv[1:])
except subprocess.CalledProcessError as err:
sys.exit(err.returncode)


if __name__ == '__main__':
main(sys.argv)
2 changes: 1 addition & 1 deletion energymeter2mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
Get values from modbus energy meter to MQTT / HomeAssistant
"""

__version__ = '0.0.1'
__version__ = '0.1.0'
__author__ = 'Jens Diemer <git@jensdiemer.de>'
79 changes: 79 additions & 0 deletions energymeter2mqtt/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
from decimal import Decimal

from ha_services.mqtt4homeassistant.data_classes import HaValue
from pymodbus.client import ModbusSerialClient
from pymodbus.exceptions import ModbusException
from pymodbus.framer.rtu_framer import ModbusRtuFramer
from pymodbus.pdu import ExceptionResponse
from pymodbus.register_read_message import ReadHoldingRegistersResponse
from rich.pretty import pprint

from energymeter2mqtt.user_settings import EnergyMeter


logger = logging.getLogger(__name__)


def get_modbus_client(energy_meter: EnergyMeter, definitions: dict, verbosity: int) -> ModbusSerialClient:
conn_settings = definitions['connection']

print(f'Connect to {energy_meter.port}...')
conn_kwargs = dict(
baudrate=conn_settings['baudrate'],
bytesize=conn_settings['bytesize'],
parity=conn_settings['parity'],
stopbits=conn_settings['stopbits'],
timeout=energy_meter.timeout,
retry_on_empty=energy_meter.retry_on_empty,
)
if verbosity:
print('Connection arguments:')
pprint(conn_kwargs)

client = ModbusSerialClient(energy_meter.port, framer=ModbusRtuFramer, broadcast_enable=False, **conn_kwargs)
if verbosity > 1:
print('connected:', client.connect())
print(client)

return client


def get_ha_values(*, client, parameters, slave_id) -> list[HaValue]:
values = []
for parameter in parameters:
logger.debug('Parameters: %r', parameter)
parameter_name = parameter['name']
address = parameter['register']
count = parameter.get('count', 1)
logger.debug('Read register %i (dez, count: %i, slave id: %i)', address, count, slave_id)

response = client.read_holding_registers(address=address, count=count, slave=slave_id)
if isinstance(response, (ExceptionResponse, ModbusException)):
logger.error(
'Error read register %i (dez, count: %i, slave id: %i): %s', address, count, slave_id, response
)
else:
assert isinstance(response, ReadHoldingRegistersResponse), f'{response=}'
registers = response.registers
logger.debug('Register values: %r', registers)
value = registers[0]
if count > 1:
value += registers[1] * 100000

if scale := parameter.get('scale'):
logger.debug('Scale %s: %r * %r', parameter_name, value, scale)
scale = Decimal(str(scale))
value = float(value * scale)
logger.debug('Scaled %s results in: %r', parameter_name, value)

ha_value = HaValue(
name=parameter_name,
value=value,
device_class=parameter['class'],
state_class=parameter['state_class'],
unit=parameter['uom'],
)
logger.debug('HA-Value: %s', ha_value)
values.append(ha_value)
return values
Loading