Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic spam control / throttle #24122

Merged
merged 13 commits into from
Oct 2, 2024
Merged
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ Zigbee2MQTT is made up of three modules, each developed in its own Github projec
### Developing

Zigbee2MQTT uses TypeScript (partially for now). Therefore after making changes to files in the `lib/` directory you need to recompile Zigbee2MQTT. This can be done by executing `npm run build`. For faster development instead of running `npm run build` you can run `npm run build-watch` in another terminal session, this will recompile as you change files.
In first time before building you need to run `npm i --save-dev @types/node`
ivanfmartinez marked this conversation as resolved.
Show resolved Hide resolved
Before submitting changes run `npm run test-with-coverage`, `npm run pretty:check` and `npm run eslint`

## Supported devices

Expand Down
21 changes: 20 additions & 1 deletion lib/extension/receive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import assert from 'assert';
import bind from 'bind-decorator';
import debounce from 'debounce';
import stringify from 'json-stable-stringify-without-jsonify';
import throttle from 'throttleit';

import * as zhc from 'zigbee-herdsman-converters';

Expand All @@ -16,6 +17,7 @@ type DebounceFunction = (() => void) & {clear(): void} & {flush(): void};
export default class Receive extends Extension {
private elapsed: {[s: string]: number} = {};
private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {};
private throttlers: {[s: string]: {publish: PublishEntityState}} = {};

async start(): Promise<void> {
this.eventBus.onPublishEntityState(this, this.onPublishEntityState);
Expand Down Expand Up @@ -68,6 +70,20 @@ export default class Receive extends Extension {
this.debouncers[device.ieeeAddr].publish();
}

async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
if (!this.throttlers[device.ieeeAddr]) {
this.throttlers[device.ieeeAddr] = {
publish: throttle(this.publishEntityState, time * 1000),
};
}

// Update state cache right away. This makes sure that during throttling cached state is always up to date.
// By updating cache we make sure that state cache is always up-to-date.
this.state.set(device, payload);

await this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle');
}

// if debounce_ignore are specified (Array of strings)
// then all newPayload values with key present in debounce_ignore
// should equal or be undefined in oldPayload
Expand Down Expand Up @@ -130,9 +146,12 @@ export default class Receive extends Extension {
this.elapsed[data.device.ieeeAddr] = now;
}

// Check if we have to debounce
// Check if we have to debounce or throttle
if (data.device.options.debounce) {
Koenkk marked this conversation as resolved.
Show resolved Hide resolved
this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore);
} else if (data.device.options.throttle || (data.device.options.description && data.device.options.description.includes('SPAMMER'))) {
ivanfmartinez marked this conversation as resolved.
Show resolved Hide resolved
const throttleTime = data.device.options.throttle || 30;
await this.publishThrottle(data.device, payload, throttleTime);
} else {
await this.publishEntityState(data.device, payload);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ declare global {
properties?: {messageExpiryInterval: number};
}
type Scene = {id: number; name: string};
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached';
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached' | 'publishThrottle';
type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise<void>;
type RecursivePartial<T> = {[P in keyof T]?: RecursivePartial<T[P]>};
interface KeyValue {
Expand Down Expand Up @@ -232,6 +232,7 @@ declare global {
retrieve_state?: boolean;
debounce?: number;
debounce_ignore?: string[];
throttle?: number;
filtered_attributes?: string[];
filtered_cache?: string[];
filtered_optimistic?: string[];
Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"rimraf": "^6.0.1",
"semver": "^7.6.3",
"source-map-support": "^0.5.21",
"throttleit": "^2.1.0",
"uri-js": "^4.4.1",
"winston": "^3.14.2",
"winston-syslog": "^2.7.1",
Expand Down
146 changes: 146 additions & 0 deletions test/receive.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,152 @@ describe('Receive', () => {
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.09, humidity: 0.01, pressure: 2});
});

it('Should ignore multiple messages from spamming devices', async () => {
const device = zigbeeHerdsman.devices.SPAMMER1;
const throttle_for_testing = 1;
settings.set(['device_options', 'throttle'], throttle_for_testing);
settings.set(['device_options', 'retain'], true);
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1');
const data1 = {measuredValue: 1};
const payload1 = {
data: data1,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload1);
const data2 = {measuredValue: 2};
const payload2 = {
data: data2,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload2);
const data3 = {measuredValue: 3};
const payload3 = {
data: data3,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload3);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(1);
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledTimes(1);
expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});

// Now we try after elapsed time to see if it publishes next message
const timeshift = throttle_for_testing * 2000;
jest.advanceTimersByTime(timeshift);
expect(MQTT.publish).toHaveBeenCalledTimes(2);
await flushPromises();

expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});

const data4 = {measuredValue: 4};
const payload4 = {
data: data4,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload4);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(3);
expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});
});

it('Should ignore multiple messages from spamming devices defined by description', async () => {
const device = zigbeeHerdsman.devices.SPAMMER2;
const throttle_for_testing = 50;
settings.set(['device_options', 'retain'], true);
settings.set(['devices', device.ieeeAddr, 'description'], 'this is a SPAMMER device');
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer2');
const data1 = {measuredValue: 1};
const payload1 = {
data: data1,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload1);
const data2 = {measuredValue: 2};
const payload2 = {
data: data2,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload2);
const data3 = {measuredValue: 3};
const payload3 = {
data: data3,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload3);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(1);
await flushPromises();
jest.advanceTimersByTime(throttle_for_testing * 1000);
expect(MQTT.publish).toHaveBeenCalledTimes(2);

expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer2');
expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});

expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer2');
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});

// Now we try again after elapsed time to see if it publishes
const timeshift = throttle_for_testing * 2000;
jest.advanceTimersByTime(timeshift);

const data4 = {measuredValue: 4};
const payload4 = {
data: data4,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload4);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(3);
expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer2');
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});
});

it('Shouldnt republish old state', async () => {
// https://github.com/Koenkk/zigbee2mqtt/issues/3572
const device = zigbeeHerdsman.devices.bulb;
Expand Down
3 changes: 3 additions & 0 deletions test/stub/zigbeeHerdsman.js
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ const devices = {
'lumi.sensor_86sw2.es1',
),
WSDCGQ11LM: new Device('EndDevice', '0x0017880104e45522', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
// These are not a real spammer devices, just copy of previous to test the spam filter
SPAMMER1: new Device('EndDevice', '0x0017880104e455fe', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
SPAMMER2: new Device('EndDevice', '0x0017880104e455ff', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
RTCGQ11LM: new Device('EndDevice', '0x0017880104e45523', 6540, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.sensor_motion.aq2'),
ZNCZ02LM: ZNCZ02LM,
E1743: new Device('Router', '0x0017880104e45540', 6540, 4476, [new Endpoint(1, [0], [])], true, 'Mains (single phase)', 'TRADFRI on/off switch'),
Expand Down