|
2 | 2 |
|
3 | 3 | import asyncio
|
4 | 4 | import logging
|
| 5 | +import math |
| 6 | +import statistics |
5 | 7 | import time
|
6 | 8 | from typing import Any
|
7 | 9 |
|
@@ -181,15 +183,41 @@ async def energy_scan(
|
181 | 183 | self, channels: zigpy.types.Channels, duration_exp: int, count: int
|
182 | 184 | ) -> dict[int, float]:
|
183 | 185 | """Runs an energy detection scan and returns the per-channel scan results."""
|
| 186 | + all_results = {} |
184 | 187 |
|
185 |
| - if duration_exp: |
186 |
| - energy = await self._api._at_command("ED", duration_exp) |
187 |
| - else: |
188 |
| - energy = await self._api._at_command("ED") |
| 188 | + for _ in range(count): |
| 189 | + if duration_exp: |
| 190 | + results = await self._api._at_command("ED", duration_exp) |
| 191 | + else: |
| 192 | + results = await self._api._at_command("ED") |
| 193 | + results = { |
| 194 | + channel: -int(rssi) for channel, rssi in zip(range(11, 27), results) |
| 195 | + } |
189 | 196 |
|
190 |
| - energy = {c: -int(e) for c, e in zip(range(11, 27), energy)} |
| 197 | + for channel, rssi in results.items(): |
| 198 | + all_results.setdefault(channel, []).append(rssi) |
| 199 | + |
| 200 | + def logistic(x: float, *, L: float = 1, x_0: float = 0, k: float = 1) -> float: |
| 201 | + """Logistic function.""" |
| 202 | + return L / (1 + math.exp(-k * (x - x_0))) |
| 203 | + |
| 204 | + def map_rssi_to_energy(rssi: int) -> float: |
| 205 | + """Remaps RSSI (in dBm) to Energy (0-255).""" |
| 206 | + RSSI_MAX = -5 |
| 207 | + RSSI_MIN = -92 |
| 208 | + return logistic( |
| 209 | + x=rssi, |
| 210 | + L=255, |
| 211 | + x_0=RSSI_MIN + 0.45 * (RSSI_MAX - RSSI_MIN), |
| 212 | + k=0.13, |
| 213 | + ) |
| 214 | + |
| 215 | + energy = { |
| 216 | + channel: map_rssi_to_energy(statistics.mean(all_rssi)) |
| 217 | + for channel, all_rssi in all_results.items() |
| 218 | + } |
191 | 219 |
|
192 |
| - return {c: energy.get(c, 0) for c in channels} |
| 220 | + return {channel: energy.get(channel, 0) for channel in channels} |
193 | 221 |
|
194 | 222 | async def force_remove(self, dev):
|
195 | 223 | """Forcibly remove device from NCP."""
|
|
0 commit comments